Cleanup filesystem error handling in snapshot_utils (#32286)

This commit is contained in:
Brooks 2023-06-27 13:08:47 -04:00 committed by GitHub
parent 6b013f46eb
commit 13aff74f82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 88 additions and 145 deletions

View File

@ -54,7 +54,6 @@ use {
cmp::Ordering,
collections::{HashMap, HashSet},
fmt,
fs::{self, File},
io::{BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, Write},
num::NonZeroUsize,
path::{Path, PathBuf},
@ -450,43 +449,23 @@ pub fn create_and_canonicalize_directories(directories: &[PathBuf]) -> Result<Ve
/// to delete the top level directory it might be able to
/// delete the contents of that directory.
fn delete_contents_of_path(path: impl AsRef<Path>) {
if let Ok(dir_entries) = std::fs::read_dir(&path) {
for entry in dir_entries.flatten() {
let sub_path = entry.path();
let metadata = match entry.metadata() {
Ok(metadata) => metadata,
Err(err) => {
warn!(
"Failed to get metadata for {}. Error: {}",
sub_path.display(),
err.to_string()
);
break;
}
};
if metadata.is_dir() {
if let Err(err) = std::fs::remove_dir_all(&sub_path) {
warn!(
"Failed to remove sub directory {}. Error: {}",
sub_path.display(),
err.to_string()
);
}
} else if metadata.is_file() {
if let Err(err) = std::fs::remove_file(&sub_path) {
warn!(
"Failed to remove file {}. Error: {}",
sub_path.display(),
err.to_string()
);
match fs_err::read_dir(path.as_ref()) {
Err(err) => {
warn!("Failed to delete contents: {err}")
}
Ok(dir_entries) => {
for entry in dir_entries.flatten() {
let sub_path = entry.path();
let result = if sub_path.is_dir() {
fs_err::remove_dir_all(&sub_path)
} else {
fs_err::remove_file(&sub_path)
};
if let Err(err) = result {
warn!("Failed to delete contents: {err}");
}
}
}
} else {
warn!(
"Failed to read the sub paths of {}",
path.as_ref().display()
);
}
}
@ -497,7 +476,7 @@ pub fn move_and_async_delete_path_contents(path: impl AsRef<Path>) {
// The following could fail if the rename failed.
// If that happens, the directory should be left as is.
// So we ignore errors here.
let _ = std::fs::create_dir(path);
_ = std::fs::create_dir(path);
}
/// Delete directories/files asynchronously to avoid blocking on it.
@ -530,11 +509,8 @@ pub fn move_and_async_delete_path(path: impl AsRef<Path>) {
path_delete.file_name().unwrap().to_str().unwrap(),
"_to_be_deleted"
));
if let Err(err) = std::fs::rename(&path, &path_delete) {
warn!(
"Path renaming failed: {}. Falling back to rm_dir in sync mode",
err.to_string()
);
if let Err(err) = fs_err::rename(&path, &path_delete) {
warn!("Path renaming failed, falling back to rm_dir in sync mode: {err}");
// Although the delete here is synchronous, we want to prevent another thread
// from moving & deleting this directory via `move_and_async_delete_path`.
lock.insert(path.as_ref().to_path_buf());
@ -607,7 +583,7 @@ pub fn clean_orphaned_account_snapshot_dirs(
/// Purges incomplete bank snapshots
pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
let Ok(read_dir_iter) = fs::read_dir(&bank_snapshots_dir) else {
let Ok(read_dir_iter) = std::fs::read_dir(&bank_snapshots_dir) else {
// If we cannot read the bank snapshots dir, then there's nothing to do
return;
};
@ -629,10 +605,7 @@ pub fn purge_incomplete_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
"Purged incomplete snapshot dir: {}",
incomplete_dir.display()
),
Err(err) => warn!(
"Failed to purge incomplete snapshot dir: {}, {err:?}",
incomplete_dir.display()
),
Err(err) => warn!("Failed to purge incomplete snapshot dir: {err}"),
}
}
}
@ -648,19 +621,22 @@ fn is_bank_snapshot_complete(bank_snapshot_dir: impl AsRef<Path>) -> bool {
/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging
/// directory won't be cleaned up. Call this function to clean them up.
pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
if let Ok(entries) = fs::read_dir(snapshot_archives_dir) {
for entry in entries.filter_map(|entry| entry.ok()) {
if let Ok(entries) = std::fs::read_dir(snapshot_archives_dir) {
for entry in entries.flatten() {
let file_name = entry
.file_name()
.into_string()
.unwrap_or_else(|_| String::new());
if file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX) {
if entry.path().is_file() {
fs_err::remove_file(entry.path())
let path = entry.path();
let result = if path.is_dir() {
fs_err::remove_dir_all(path)
} else {
fs_err::remove_dir_all(entry.path())
fs_err::remove_file(path)
};
if let Err(err) = result {
warn!("Failed to remove temporary snapshot archive: {err}");
}
.unwrap_or_else(|err| warn!("Failed to remove temporary snapshot archive: {err}"));
}
}
}
@ -864,13 +840,9 @@ pub fn archive_snapshot_package(
/// Get the bank snapshots in a directory
pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
let mut bank_snapshots = Vec::default();
match fs::read_dir(&bank_snapshots_dir) {
match fs_err::read_dir(bank_snapshots_dir.as_ref()) {
Err(err) => {
info!(
"Unable to read bank snapshots directory {}: {}",
bank_snapshots_dir.as_ref().display(),
err
);
info!("Unable to read bank snapshots directory: {err}");
}
Ok(paths) => paths
.filter_map(|entry| {
@ -951,7 +923,7 @@ fn do_get_highest_bank_snapshot(
pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
where
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
{
serialize_snapshot_data_file_capped::<F>(
data_file_path,
@ -962,9 +934,9 @@ where
pub fn deserialize_snapshot_data_file<T: Sized>(
data_file_path: &Path,
deserializer: impl FnOnce(&mut BufReader<File>) -> Result<T>,
deserializer: impl FnOnce(&mut BufReader<std::fs::File>) -> Result<T>,
) -> Result<T> {
let wrapped_deserializer = move |streams: &mut SnapshotStreams<File>| -> Result<T> {
let wrapped_deserializer = move |streams: &mut SnapshotStreams<std::fs::File>| -> Result<T> {
deserializer(streams.full_snapshot_stream)
};
@ -982,7 +954,7 @@ pub fn deserialize_snapshot_data_file<T: Sized>(
fn deserialize_snapshot_data_files<T: Sized>(
snapshot_root_paths: &SnapshotRootPaths,
deserializer: impl FnOnce(&mut SnapshotStreams<File>) -> Result<T>,
deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
) -> Result<T> {
deserialize_snapshot_data_files_capped(
snapshot_root_paths,
@ -997,9 +969,9 @@ fn serialize_snapshot_data_file_capped<F>(
serializer: F,
) -> Result<u64>
where
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
F: FnOnce(&mut BufWriter<std::fs::File>) -> Result<()>,
{
let data_file = File::create(data_file_path)?;
let data_file = fs_err::File::create(data_file_path)?.into();
let mut data_file_stream = BufWriter::new(data_file);
serializer(&mut data_file_stream)?;
data_file_stream.flush()?;
@ -1017,7 +989,7 @@ where
fn deserialize_snapshot_data_files_capped<T: Sized>(
snapshot_root_paths: &SnapshotRootPaths,
maximum_file_size: u64,
deserializer: impl FnOnce(&mut SnapshotStreams<File>) -> Result<T>,
deserializer: impl FnOnce(&mut SnapshotStreams<std::fs::File>) -> Result<T>,
) -> Result<T> {
let (full_snapshot_file_size, mut full_snapshot_data_file_stream) =
create_snapshot_data_file_stream(
@ -1072,20 +1044,21 @@ fn deserialize_snapshot_data_files_capped<T: Sized>(
fn create_snapshot_data_file_stream(
snapshot_root_file_path: impl AsRef<Path>,
maximum_file_size: u64,
) -> Result<(u64, BufReader<File>)> {
) -> Result<(u64, BufReader<std::fs::File>)> {
let snapshot_file_size = fs_err::metadata(&snapshot_root_file_path)?.len();
if snapshot_file_size > maximum_file_size {
let error_message =
format!(
let error_message = format!(
"too large snapshot data file to deserialize: {} has {} bytes (max size is {} bytes)",
snapshot_root_file_path.as_ref().display(), snapshot_file_size, maximum_file_size
snapshot_root_file_path.as_ref().display(),
snapshot_file_size,
maximum_file_size,
);
return Err(get_io_error(&error_message));
}
let snapshot_data_file = File::open(&snapshot_root_file_path)?;
let snapshot_data_file_stream = BufReader::new(snapshot_data_file);
let snapshot_data_file = fs_err::File::open(snapshot_root_file_path.as_ref())?;
let snapshot_data_file_stream = BufReader::new(snapshot_data_file.into());
Ok((snapshot_file_size, snapshot_data_file_stream))
}
@ -1095,15 +1068,16 @@ fn create_snapshot_data_file_stream(
fn check_deserialize_file_consumed(
file_size: u64,
file_path: impl AsRef<Path>,
file_stream: &mut BufReader<File>,
file_stream: &mut BufReader<std::fs::File>,
) -> Result<()> {
let consumed_size = file_stream.stream_position()?;
if consumed_size != file_size {
let error_message =
format!(
let error_message = format!(
"invalid snapshot data file: {} has {} bytes, however consumed {} bytes to deserialize",
file_path.as_ref().display(), file_size, consumed_size
file_path.as_ref().display(),
file_size,
consumed_size,
);
return Err(get_io_error(&error_message));
}
@ -1204,10 +1178,10 @@ fn get_snapshot_accounts_hardlink_dir(
fs_err::create_dir_all(&snapshot_hardlink_dir)
.map_err(|err| SnapshotError::IoWithSource(err, "create hard-link dir"))?;
let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
symlink::symlink_dir(&snapshot_hardlink_dir, symlink_path).map_err(|e| {
symlink::symlink_dir(&snapshot_hardlink_dir, symlink_path).map_err(|err| {
SnapshotError::IoWithSourceAndFile(
e,
"simlink the hard-link dir",
err,
"symlink the hard-link dir",
snapshot_hardlink_dir.clone(),
)
})?;
@ -1243,8 +1217,7 @@ fn hard_link_storages_to_snapshot(
// Use the storage slot and id to compose a consistent file name for the hard-link file.
let hardlink_filename = AppendVec::file_name(storage.slot(), storage.append_vec_id());
let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
fs_err::hard_link(&storage_path, &hard_link_path)
.map_err(|err| SnapshotError::IoWithSource(err, "hard-link append vec file"))?;
fs_err::hard_link(&storage_path, &hard_link_path)?;
}
Ok(())
}
@ -1289,7 +1262,7 @@ pub fn add_bank_snapshot(
// from the operational accounts/ directory to here.
hard_link_storages_to_snapshot(&bank_snapshot_dir, slot, snapshot_storages)?;
let bank_snapshot_serializer = move |stream: &mut BufWriter<File>| -> Result<()> {
let bank_snapshot_serializer = move |stream: &mut BufWriter<std::fs::File>| -> Result<()> {
let serde_style = match snapshot_version {
SnapshotVersion::V1_2_0 => SerdeStyle::Newer,
};
@ -1912,7 +1885,7 @@ fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Pat
}
// The unpacked dir has a single slot dir, which is the snapshot slot dir.
let slot_dir = fs::read_dir(&snapshots_dir)
let slot_dir = std::fs::read_dir(&snapshots_dir)
.map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
.find(|entry| entry.as_ref().unwrap().path().is_dir())
.ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
@ -1929,7 +1902,7 @@ fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Pat
)?;
let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
fs::File::create(state_complete_file)?;
fs_err::File::create(state_complete_file)?;
Ok(())
}
@ -2021,68 +1994,40 @@ fn build_storage_from_snapshot_dir(
next_append_vec_id: Arc<AtomicAppendVecId>,
) -> Result<AccountStorageMap> {
let bank_snapshot_dir = &snapshot_info.snapshot_dir;
let snapshot_file_path = &snapshot_info.snapshot_path();
let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
let (file_sender, file_receiver) = crossbeam_channel::unbounded();
let accounts_hardlinks = bank_snapshot_dir.join(SNAPSHOT_ACCOUNTS_HARDLINKS);
let account_run_paths: HashSet<_> = HashSet::from_iter(account_paths);
let account_paths_set: HashSet<_> = HashSet::from_iter(account_paths.iter());
for dir_entry in fs::read_dir(&accounts_hardlinks).map_err(|err| {
SnapshotError::IoWithSourceAndFile(
err,
"read_dir failed for accounts_hardlinks",
accounts_hardlinks.to_path_buf(),
)
})? {
for dir_entry in fs_err::read_dir(&accounts_hardlinks)? {
let symlink_path = dir_entry?.path();
// The symlink point to <account_path>/snapshot/<slot> which contain the account files hardlinks
// The corresponding run path should be <account_path>/run/
let snapshot_account_path = fs::read_link(&symlink_path).map_err(|err| {
SnapshotError::IoWithSourceAndFile(
err,
"read_link failed for symlink",
symlink_path.to_path_buf(),
)
})?;
let account_run_path = snapshot_account_path
let account_snapshot_path = fs_err::read_link(&symlink_path)?;
let account_run_path = account_snapshot_path
.parent()
.ok_or_else(|| SnapshotError::InvalidAccountPath(snapshot_account_path.clone()))?
.ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
.parent()
.ok_or_else(|| SnapshotError::InvalidAccountPath(snapshot_account_path.clone()))?
.ok_or_else(|| SnapshotError::InvalidAccountPath(account_snapshot_path.clone()))?
.join("run");
if !account_paths_set.contains(&account_run_path) {
// The appendvec from the bank snapshot stoarge does not match any of the provided account_paths set.
if !account_run_paths.contains(&account_run_path) {
// The appendvec from the bank snapshot storage does not match any of the provided account_paths set.
// The accout paths have changed so the snapshot is no longer usable.
return Err(SnapshotError::AccountPathsMismatch);
}
// Generate hard-links to make the account files available in the main accounts/, and let the new appendvec
// paths be in accounts/
for file in fs::read_dir(&snapshot_account_path).map_err(|err| {
SnapshotError::IoWithSourceAndFile(
err,
"read_dir failed for snapshot_account_path",
snapshot_account_path.to_path_buf(),
)
})? {
for file in fs_err::read_dir(&account_snapshot_path)? {
let file_path = file?.path();
let file_name = file_path
.file_name()
.ok_or_else(|| SnapshotError::InvalidAppendVecPath(file_path.to_path_buf()))?;
let dest_path = account_run_path.clone().join(file_name);
fs::hard_link(&file_path, &dest_path).map_err(|e| {
let err_msg = format!(
"Error: {}. Failed to hard-link {} to {}",
e,
file_path.display(),
dest_path.display()
);
SnapshotError::Io(IoError::new(ErrorKind::Other, err_msg))
})?;
let dest_path = account_run_path.join(file_name);
fs_err::hard_link(&file_path, &dest_path)?;
}
}
let (file_sender, file_receiver) = crossbeam_channel::unbounded();
let snapshot_file_path = &snapshot_info.snapshot_path();
let snapshot_version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
streaming_snapshot_dir_files(
file_sender,
snapshot_file_path,
@ -2123,7 +2068,7 @@ fn snapshot_version_from_file(path: impl AsRef<Path>) -> Result<String> {
// Read snapshot_version from file.
let mut snapshot_version = String::new();
File::open(path).and_then(|mut f| f.read_to_string(&mut snapshot_version))?;
fs_err::File::open(path.as_ref()).and_then(|mut f| f.read_to_string(&mut snapshot_version))?;
Ok(snapshot_version.trim().to_string())
}
@ -2275,11 +2220,7 @@ where
let entry_iter = fs_err::read_dir(dir);
match entry_iter {
Err(err) => {
info!(
"Unable to read snapshot archives directory {}: {}",
dir.display(),
err,
);
info!("Unable to read snapshot archives directory: {err}");
vec![]
}
Ok(entries) => entries
@ -2407,8 +2348,10 @@ pub fn purge_old_snapshot_archives(
fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
for path in archives.iter().map(|a| a.path()) {
trace!("Removing snapshot archive: {}", path.display());
fs_err::remove_file(path)
.unwrap_or_else(|err| info!("Failed to remove {}: {}", path.display(), err));
let result = fs_err::remove_file(path);
if let Err(err) = result {
info!("Failed to remove snapshot archive: {err}",);
}
}
}
remove_archives(full_snapshot_archives_to_remove);
@ -2494,7 +2437,7 @@ fn untar_snapshot_create_shared_buffer(
snapshot_tar: &Path,
archive_format: ArchiveFormat,
) -> SharedBuffer {
let open_file = || File::open(snapshot_tar).unwrap();
let open_file = || fs_err::File::open(snapshot_tar).unwrap();
match archive_format {
ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
@ -3777,10 +3720,10 @@ mod tests {
let snapshot_filename = get_snapshot_file_name(slot);
let snapshot_path = snapshot_dir.join(snapshot_filename);
File::create(snapshot_path).unwrap();
fs_err::File::create(snapshot_path).unwrap();
let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
File::create(status_cache_file).unwrap();
fs_err::File::create(status_cache_file).unwrap();
let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
write_snapshot_version_file(version_path, SnapshotVersion::default()).unwrap();
@ -3840,13 +3783,13 @@ mod tests {
Hash::default()
);
let snapshot_filepath = incremental_snapshot_archives_dir.join(snapshot_filename);
File::create(snapshot_filepath).unwrap();
fs_err::File::create(snapshot_filepath).unwrap();
}
let snapshot_filename =
format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
let snapshot_filepath = full_snapshot_archives_dir.join(snapshot_filename);
File::create(snapshot_filepath).unwrap();
fs_err::File::create(snapshot_filepath).unwrap();
// Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly
let bad_filename = format!(
@ -3855,14 +3798,14 @@ mod tests {
max_incremental_snapshot_slot + 1,
);
let bad_filepath = incremental_snapshot_archives_dir.join(bad_filename);
File::create(bad_filepath).unwrap();
fs_err::File::create(bad_filepath).unwrap();
}
// Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
// sorted correctly
let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1);
let bad_filepath = full_snapshot_archives_dir.join(bad_filename);
File::create(bad_filepath).unwrap();
fs_err::File::create(bad_filepath).unwrap();
}
#[test]
@ -4034,7 +3977,7 @@ mod tests {
for snap_name in snapshot_names {
let snap_path = temp_snap_dir.path().join(snap_name);
let mut _snap_file = File::create(snap_path);
let mut _snap_file = fs_err::File::create(snap_path);
}
purge_old_snapshot_archives(
temp_snap_dir.path(),
@ -4116,7 +4059,7 @@ mod tests {
let full_snapshot_archive_path = full_snapshot_archives_dir
.as_ref()
.join(full_snapshot_archive_file_name);
File::create(full_snapshot_archive_path).unwrap();
fs_err::File::create(full_snapshot_archive_path).unwrap();
// don't purge-and-check until enough snapshot archives have been created
if slot < starting_slot + maximum_snapshots_to_retain.get() as Slot {
@ -4177,7 +4120,7 @@ mod tests {
let snapshot_filename =
format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
let snapshot_path = full_snapshot_archives_dir.path().join(&snapshot_filename);
File::create(snapshot_path).unwrap();
fs_err::File::create(snapshot_path).unwrap();
snapshot_filenames.push(snapshot_filename);
(full_snapshot_slot..)
@ -4194,7 +4137,7 @@ mod tests {
let snapshot_path = incremental_snapshot_archives_dir
.path()
.join(&snapshot_filename);
File::create(snapshot_path).unwrap();
fs_err::File::create(snapshot_path).unwrap();
snapshot_filenames.push(snapshot_filename);
});
});
@ -4297,7 +4240,7 @@ mod tests {
let snapshot_path = incremental_snapshot_archives_dir
.path()
.join(snapshot_filenames);
File::create(snapshot_path).unwrap();
fs_err::File::create(snapshot_path).unwrap();
}
purge_old_snapshot_archives(