Introduces fs_err to snapshot_utils (#32266)

This commit is contained in:
Brooks 2023-06-26 13:14:27 -04:00 committed by GitHub
parent 5f1b5b877a
commit 95fae285a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 75 additions and 56 deletions

7
Cargo.lock generated
View File

@ -1893,6 +1893,12 @@ dependencies = [
"percent-encoding 2.3.0",
]
[[package]]
name = "fs-err"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0845fa252299212f0389d64ba26f34fa32cfe41588355f21ed507c59a0f64541"
[[package]]
name = "fs_extra"
version = "1.3.0"
@ -6639,6 +6645,7 @@ dependencies = [
"ed25519-dalek",
"flate2",
"fnv",
"fs-err",
"im",
"index_list",
"itertools",

View File

@ -190,6 +190,7 @@ fd-lock = "3.0.12"
flate2 = "1.0.26"
fnv = "1.0.7"
fs_extra = "1.3.0"
fs-err = "2.9.0"
futures = "0.3.28"
futures-util = "0.3.28"
gag = "1.0.0"

View File

@ -1633,6 +1633,12 @@ dependencies = [
"percent-encoding 2.3.0",
]
[[package]]
name = "fs-err"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0845fa252299212f0389d64ba26f34fa32cfe41588355f21ed507c59a0f64541"
[[package]]
name = "fs_extra"
version = "1.3.0"
@ -5427,6 +5433,7 @@ dependencies = [
"dir-diff",
"flate2",
"fnv",
"fs-err",
"im",
"index_list",
"itertools",

View File

@ -22,6 +22,7 @@ dashmap = { workspace = true, features = ["rayon", "raw-api"] }
dir-diff = { workspace = true }
flate2 = { workspace = true }
fnv = { workspace = true }
fs-err = { workspace = true }
im = { workspace = true, features = ["rayon", "serde"] }
index_list = { workspace = true }
itertools = { workspace = true }

View File

@ -36,6 +36,7 @@ use {
bzip2::bufread::BzDecoder,
crossbeam_channel::Sender,
flate2::read::GzDecoder,
fs_err,
lazy_static::lazy_static,
log::*,
rayon::prelude::*,
@ -589,16 +590,16 @@ pub fn clean_orphaned_account_snapshot_dirs(
for snapshot in snapshots {
let account_hardlinks_dir = snapshot.snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
// loop through entries in the snapshot_hardlink_dir, read the symlinks, add the target to the HashSet
for entry in fs::read_dir(&account_hardlinks_dir)? {
for entry in fs_err::read_dir(&account_hardlinks_dir)? {
let path = entry?.path();
let target = fs::read_link(&path)?;
let target = fs_err::read_link(&path)?;
account_snapshot_dirs_referenced.insert(target);
}
}
// loop through the account snapshot hardlink directories, if the directory is not in the account_snapshot_dirs_referenced set, delete it
for account_snapshot_path in account_snapshot_paths {
for entry in fs::read_dir(account_snapshot_path)? {
for entry in fs_err::read_dir(account_snapshot_path)? {
let path = entry?.path();
if !account_snapshot_dirs_referenced.contains(&path) {
info!(
@ -775,7 +776,7 @@ pub fn archive_snapshot_package(
// `storage_path` - The file path where the AppendVec itself is located
// `output_path` - The file path where the AppendVec will be placed in the staging directory.
let storage_path =
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
fs_err::canonicalize(storage_path).expect("Could not get absolute path for accounts");
symlink::symlink_file(storage_path, &output_path)
.map_err(|e| SnapshotError::IoWithSource(e, "create storage symlink"))?;
if !output_path.is_file() {
@ -794,7 +795,7 @@ pub fn archive_snapshot_package(
));
{
let mut archive_file = fs::File::create(&archive_path)?;
let mut archive_file = fs_err::File::create(&archive_path)?;
let do_archive_files = |encoder: &mut dyn Write| -> Result<()> {
let mut archive = tar::Builder::new(encoder);
@ -1098,7 +1099,7 @@ fn create_snapshot_data_file_stream(
snapshot_root_file_path: impl AsRef<Path>,
maximum_file_size: u64,
) -> Result<(u64, BufReader<File>)> {
let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len();
let snapshot_file_size = fs_err::metadata(&snapshot_root_file_path)?.len();
if snapshot_file_size > maximum_file_size {
let error_message =
@ -1154,11 +1155,11 @@ pub fn create_accounts_run_and_snapshot_dirs(
// to this new version using run and snapshot directories.
// The run/ content cleanup will be done at a later point. The snapshot/ content persists
// across the process boot, and will be purged by the account_background_service.
if fs::remove_dir_all(&account_dir).is_err() {
if fs_err::remove_dir_all(&account_dir).is_err() {
delete_contents_of_path(&account_dir);
}
fs::create_dir_all(&run_path)?;
fs::create_dir_all(&snapshot_path)?;
fs_err::create_dir_all(&run_path)?;
fs_err::create_dir_all(&snapshot_path)?;
}
Ok((run_path, snapshot_path))
@ -1258,7 +1259,7 @@ fn hard_link_storages_to_snapshot(
snapshot_storages: &[Arc<AccountStorageEntry>],
) -> Result<()> {
let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
fs::create_dir_all(&accounts_hardlinks_dir)?;
fs_err::create_dir_all(&accounts_hardlinks_dir)?;
let mut account_paths: HashSet<PathBuf> = HashSet::new();
for storage in snapshot_storages {
@ -1308,7 +1309,7 @@ pub fn add_bank_snapshot(
"A bank snapshot already exists for slot {slot}!? Path: {}",
bank_snapshot_dir.display()
);
fs::create_dir_all(&bank_snapshot_dir)?;
fs_err::create_dir_all(&bank_snapshot_dir)?;
// the bank snapshot is stored as bank_snapshots_dir/slot/slot.BANK_SNAPSHOT_PRE_FILENAME_EXTENSION
let bank_snapshot_path = bank_snapshot_dir
@ -1354,7 +1355,7 @@ pub fn add_bank_snapshot(
// Mark this directory complete so it can be used. Check this flag first before selecting for deserialization.
let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
fs::File::create(state_complete_path)?;
fs_err::File::create(state_complete_path)?;
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
datapoint_info!(
@ -1958,10 +1959,10 @@ fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Pat
.path();
let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
fs_err::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
fs::hard_link(
fs_err::hard_link(
status_cache_file,
slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
)?;
@ -2043,7 +2044,7 @@ fn streaming_snapshot_dir_files(
file_sender.send(snapshot_version_path.into())?;
for account_path in account_paths {
for file in fs::read_dir(account_path)? {
for file in fs_err::read_dir(account_path)? {
file_sender.send(file?.path())?;
}
}
@ -2148,7 +2149,7 @@ fn build_storage_from_snapshot_dir(
/// threshold, it is not opened and an error is returned.
fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
// Check file size.
let file_size = fs::metadata(&path)?.len();
let file_size = fs_err::metadata(&path)?.len();
if file_size > MAX_SNAPSHOT_VERSION_FILE_SIZE {
let error_message = format!(
"snapshot version file too large: {} has {} bytes (max size is {} bytes)",
@ -2310,13 +2311,13 @@ where
F: Fn(PathBuf) -> Result<T>,
{
let walk_dir = |dir: &Path| -> Vec<T> {
let entry_iter = fs::read_dir(dir);
let entry_iter = fs_err::read_dir(dir);
match entry_iter {
Err(err) => {
info!(
"Unable to read snapshot archives directory: err: {}, path: {}",
"Unable to read snapshot archives directory {}: {}",
dir.display(),
err,
dir.display()
);
vec![]
}
@ -2445,7 +2446,7 @@ pub fn purge_old_snapshot_archives(
fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
for path in archives.iter().map(|a| a.path()) {
trace!("Removing snapshot archive: {}", path.display());
fs::remove_file(path)
fs_err::remove_file(path)
.unwrap_or_else(|err| info!("Failed to remove {}: {}", path.display(), err));
}
}
@ -2949,7 +2950,7 @@ pub fn verify_snapshot_archive(
// collect all the appendvecs in account_paths/<slot>/snapshot/ into one directory for later comparison.
let storages_to_verify = unpack_dir.join("storages_to_verify");
// Create the directory if it doesn't exist
std::fs::create_dir_all(&storages_to_verify).unwrap();
fs_err::create_dir_all(&storages_to_verify).unwrap();
let slot = slot.to_string();
let snapshot_slot_dir = snapshots_to_verify.as_ref().join(&slot);
@ -2959,8 +2960,8 @@ pub fn verify_snapshot_archive(
let p1 = snapshots_to_verify.as_ref().join(&slot).join(&slot);
let p2 = unpacked_snapshots.join(&slot).join(&slot);
assert!(crate::serde_snapshot::compare_two_serialized_banks(&p1, &p2).unwrap());
std::fs::remove_file(p1).unwrap();
std::fs::remove_file(p2).unwrap();
fs_err::remove_file(p1).unwrap();
fs_err::remove_file(p2).unwrap();
}
// The new the status_cache file is inside the slot directory together with the snapshot file.
@ -2973,7 +2974,7 @@ pub fn verify_snapshot_archive(
let new_unpacked_status_cache_file = unpacked_snapshots
.join(&slot)
.join(SNAPSHOT_STATUS_CACHE_FILENAME);
fs::rename(
fs_err::rename(
existing_unpacked_status_cache_file,
new_unpacked_status_cache_file,
)
@ -2982,26 +2983,26 @@ pub fn verify_snapshot_archive(
let accounts_hardlinks_dir = snapshot_slot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
if accounts_hardlinks_dir.is_dir() {
// This directory contain symlinks to all <account_path>/snapshot/<slot> directories.
for entry in fs::read_dir(&accounts_hardlinks_dir).unwrap() {
let link_dst_path = fs::read_link(entry.unwrap().path()).unwrap();
for entry in fs_err::read_dir(&accounts_hardlinks_dir).unwrap() {
let link_dst_path = fs_err::read_link(entry.unwrap().path()).unwrap();
// Copy all the files in dst_path into the storages_to_verify directory.
for entry in fs::read_dir(&link_dst_path).unwrap() {
for entry in fs_err::read_dir(&link_dst_path).unwrap() {
let src_path = entry.unwrap().path();
let dst_path = storages_to_verify.join(src_path.file_name().unwrap());
fs::copy(src_path, dst_path).unwrap();
fs_err::copy(src_path, dst_path).unwrap();
}
}
std::fs::remove_dir_all(accounts_hardlinks_dir).unwrap();
fs_err::remove_dir_all(accounts_hardlinks_dir).unwrap();
}
let version_path = snapshot_slot_dir.join(SNAPSHOT_VERSION_FILENAME);
if version_path.is_file() {
std::fs::remove_file(version_path).unwrap();
fs_err::remove_file(version_path).unwrap();
}
let state_complete_path = snapshot_slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
if state_complete_path.is_file() {
std::fs::remove_file(state_complete_path).unwrap();
fs_err::remove_file(state_complete_path).unwrap();
}
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
@ -3011,7 +3012,7 @@ pub fn verify_snapshot_archive(
// Remove the empty "accounts" directory for the directory comparison below.
// In some test cases the directory to compare do not come from unarchiving.
// Ignore the error when this directory does not exist.
_ = std::fs::remove_dir(unpack_account_dir.join("accounts"));
_ = fs_err::remove_dir(unpack_account_dir.join("accounts"));
// Check the account entries are the same
assert!(!dir_diff::is_different(&storages_to_verify, unpack_account_dir).unwrap());
}
@ -3078,12 +3079,12 @@ fn purge_bank_snapshot(bank_snapshot_dir: impl AsRef<Path>) -> Result<()> {
if accounts_hardlinks_dir.is_dir() {
// This directory contain symlinks to all accounts snapshot directories.
// They should all be removed.
for accounts_hardlink_dir in fs::read_dir(accounts_hardlinks_dir)? {
let accounts_hardlink_dir = fs::read_link(accounts_hardlink_dir?.path())?;
for accounts_hardlink_dir in fs_err::read_dir(accounts_hardlinks_dir)? {
let accounts_hardlink_dir = fs_err::read_link(accounts_hardlink_dir?.path())?;
move_and_async_delete_path(&accounts_hardlink_dir);
}
}
fs::remove_dir_all(bank_snapshot_dir)?;
fs_err::remove_dir_all(bank_snapshot_dir)?;
Ok(())
}
@ -3811,7 +3812,7 @@ mod tests {
) {
for slot in min_slot..max_slot {
let snapshot_dir = get_bank_snapshot_dir(bank_snapshots_dir, slot);
fs::create_dir_all(&snapshot_dir).unwrap();
fs_err::create_dir_all(&snapshot_dir).unwrap();
let snapshot_filename = get_snapshot_file_name(slot);
let snapshot_path = snapshot_dir.join(snapshot_filename);
@ -3825,7 +3826,7 @@ mod tests {
// Mark this directory complete so it can be used. Check this flag first before selecting for deserialization.
let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
fs::File::create(state_complete_path).unwrap();
fs_err::File::create(state_complete_path).unwrap();
}
}
@ -3865,8 +3866,8 @@ mod tests {
min_incremental_snapshot_slot: Slot,
max_incremental_snapshot_slot: Slot,
) {
fs::create_dir_all(full_snapshot_archives_dir).unwrap();
fs::create_dir_all(incremental_snapshot_archives_dir).unwrap();
fs_err::create_dir_all(full_snapshot_archives_dir).unwrap();
fs_err::create_dir_all(incremental_snapshot_archives_dir).unwrap();
for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
for incremental_snapshot_slot in
min_incremental_snapshot_slot..max_incremental_snapshot_slot
@ -4082,7 +4083,7 @@ mod tests {
);
let mut retained_snaps = HashSet::new();
for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
for entry in fs_err::read_dir(temp_snap_dir.path()).unwrap() {
let entry_path_buf = entry.unwrap().path();
let entry_path = entry_path_buf.as_path();
let snapshot_name = entry_path
@ -5215,15 +5216,15 @@ mod tests {
let accounts_hardlinks_dir = get_bank_snapshot_dir(&bank_snapshots_dir, bank.slot())
.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
assert!(fs::metadata(&accounts_hardlinks_dir).is_ok());
assert!(fs_err::metadata(&accounts_hardlinks_dir).is_ok());
let mut hardlink_dirs: Vec<PathBuf> = Vec::new();
// This directory contain symlinks to all accounts snapshot directories.
for entry in fs::read_dir(accounts_hardlinks_dir).unwrap() {
for entry in fs_err::read_dir(accounts_hardlinks_dir).unwrap() {
let entry = entry.unwrap();
let symlink = entry.path();
let dst_path = fs::read_link(symlink).unwrap();
assert!(fs::metadata(&dst_path).is_ok());
let dst_path = fs_err::read_link(symlink).unwrap();
assert!(fs_err::metadata(&dst_path).is_ok());
hardlink_dirs.push(dst_path);
}
@ -5231,7 +5232,9 @@ mod tests {
assert!(purge_bank_snapshot(bank_snapshot_dir).is_ok());
// When the bank snapshot is removed, all the snapshot hardlink directories should be removed.
assert!(hardlink_dirs.iter().all(|dir| fs::metadata(dir).is_err()));
assert!(hardlink_dirs
.iter()
.all(|dir| fs_err::metadata(dir).is_err()));
}
#[test]
@ -5243,7 +5246,7 @@ mod tests {
let bank_snapshots_dir_tmp = tempfile::TempDir::new().unwrap();
let bank_snapshot_dir = bank_snapshots_dir_tmp.path().join(slot.to_string());
let accounts_hardlinks_dir = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
fs::create_dir_all(&accounts_hardlinks_dir).unwrap();
fs_err::create_dir_all(&accounts_hardlinks_dir).unwrap();
let (_tmp_dir, accounts_dir) = create_tmp_accounts_dir_for_tests();
let appendvec_filename = format!("{slot}.0");
@ -5283,7 +5286,7 @@ mod tests {
assert_eq!(snapshot.slot, 4);
let complete_flag_file = snapshot.snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
fs::remove_file(complete_flag_file).unwrap();
fs_err::remove_file(complete_flag_file).unwrap();
// The incomplete snapshot dir should still exist
let snapshot_dir_4 = snapshot.snapshot_dir;
assert!(snapshot_dir_4.exists());
@ -5291,12 +5294,12 @@ mod tests {
assert_eq!(snapshot.slot, 3);
let snapshot_version_file = snapshot.snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
fs::remove_file(snapshot_version_file).unwrap();
fs_err::remove_file(snapshot_version_file).unwrap();
let snapshot = get_highest_bank_snapshot(&bank_snapshots_dir).unwrap();
assert_eq!(snapshot.slot, 2);
let status_cache_file = snapshot.snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
fs::remove_file(status_cache_file).unwrap();
fs_err::remove_file(status_cache_file).unwrap();
let snapshot = get_highest_bank_snapshot(&bank_snapshots_dir).unwrap();
assert_eq!(snapshot.slot, 1);
}
@ -5340,21 +5343,21 @@ mod tests {
// the symlinks point to the account snapshot hardlink directories <account_path>/snapshot/<slot>/ for slot 2
// get them via read_link
let hardlink_dirs_slot_2: Vec<PathBuf> = fs::read_dir(accounts_link_dir_slot_2)
let hardlink_dirs_slot_2: Vec<PathBuf> = fs_err::read_dir(accounts_link_dir_slot_2)
.unwrap()
.map(|entry| {
let symlink = entry.unwrap().path();
fs::read_link(symlink).unwrap()
fs_err::read_link(symlink).unwrap()
})
.collect();
// remove the bank snapshot directory for slot 2, so the account snapshot slot 2 directories become orphaned
fs::remove_dir_all(snapshot_dir_slot_2).unwrap();
fs_err::remove_dir_all(snapshot_dir_slot_2).unwrap();
// verify the orphaned account snapshot hardlink directories are still there
assert!(hardlink_dirs_slot_2
.iter()
.all(|dir| fs::metadata(dir).is_ok()));
.all(|dir| fs_err::metadata(dir).is_ok()));
let account_snapshot_paths: Vec<PathBuf> = hardlink_dirs_slot_2
.iter()
@ -5366,7 +5369,7 @@ mod tests {
// verify the hardlink directories are gone
assert!(hardlink_dirs_slot_2
.iter()
.all(|dir| fs::metadata(dir).is_err()));
.all(|dir| fs_err::metadata(dir).is_err()));
}
#[test]
@ -5379,7 +5382,7 @@ mod tests {
for slot in [1, 2] {
let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
let state_complete_file = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
fs::remove_file(state_complete_file).unwrap();
fs_err::remove_file(state_complete_file).unwrap();
}
purge_incomplete_bank_snapshots(&bank_snapshots_dir);
@ -5594,7 +5597,7 @@ mod tests {
// Verify that the next_append_vec_id tracking is correct
let mut max_id = 0;
for path in account_paths {
fs::read_dir(path).unwrap().for_each(|entry| {
fs_err::read_dir(path).unwrap().for_each(|entry| {
let path = entry.unwrap().path();
let filename = path.file_name().unwrap();
let (_slot, append_vec_id) = get_slot_and_append_vec_id(filename.to_str().unwrap());