Returns ArchiveSnapshotPackageError from archive_snapshot_package() (#34855)
This commit is contained in:
parent
2b0b5ae1ba
commit
a915e2f2a0
|
@ -301,9 +301,6 @@ pub enum SnapshotError {
|
|||
#[error("archive generation failure {0}")]
|
||||
ArchiveGenerationFailure(ExitStatus),
|
||||
|
||||
#[error("storage path symlink is invalid '{0}'")]
|
||||
StoragePathSymlinkInvalid(PathBuf),
|
||||
|
||||
#[error("Unpack error: {0}")]
|
||||
UnpackError(#[from] UnpackError),
|
||||
|
||||
|
@ -351,6 +348,9 @@ pub enum SnapshotError {
|
|||
|
||||
#[error("failed to add bank snapshot for slot {1}: {0}")]
|
||||
AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
|
||||
|
||||
#[error("failed to archive snapshot package: {0}")]
|
||||
ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
|
@ -426,6 +426,73 @@ pub enum AddBankSnapshotError {
|
|||
CreateStateCompleteFile(#[source] IoError),
|
||||
}
|
||||
|
||||
/// Errors that can happen in `archive_snapshot_package()`
|
||||
#[derive(Error, Debug)]
|
||||
pub enum ArchiveSnapshotPackageError {
|
||||
#[error("failed to create archive path '{1}': {0}")]
|
||||
CreateArchiveDir(#[source] IoError, PathBuf),
|
||||
|
||||
#[error("failed to create staging dir inside '{1}': {0}")]
|
||||
CreateStagingDir(#[source] IoError, PathBuf),
|
||||
|
||||
#[error("failed to create accounts staging dir '{1}': {0}")]
|
||||
CreateAccountsStagingDir(#[source] IoError, PathBuf),
|
||||
|
||||
#[error("failed to create snapshot staging dir '{1}': {0}")]
|
||||
CreateSnapshotStagingDir(#[source] IoError, PathBuf),
|
||||
|
||||
#[error("failed to canonicalize snapshot source dir '{1}': {0}")]
|
||||
CanonicalizeSnapshotSourceDir(#[source] IoError, PathBuf),
|
||||
|
||||
#[error("failed to symlink snapshot from '{1}' to '{2}': {0}")]
|
||||
SymlinkSnapshot(#[source] IoError, PathBuf, PathBuf),
|
||||
|
||||
#[error("failed to symlink status cache from '{1}' to '{2}': {0}")]
|
||||
SymlinkStatusCache(#[source] IoError, PathBuf, PathBuf),
|
||||
|
||||
#[error("failed to symlink version file from '{1}' to '{2}': {0}")]
|
||||
SymlinkVersionFile(#[source] IoError, PathBuf, PathBuf),
|
||||
|
||||
#[error("failed to flush account storage file '{1}': {0}")]
|
||||
FlushAccountStorageFile(#[source] AccountsFileError, PathBuf),
|
||||
|
||||
#[error("failed to canonicalize account storage file '{1}': {0}")]
|
||||
CanonicalizeAccountStorageFile(#[source] IoError, PathBuf),
|
||||
|
||||
#[error("failed to symlink account storage file from '{1}' to '{2}': {0}")]
|
||||
SymlinkAccountStorageFile(#[source] IoError, PathBuf, PathBuf),
|
||||
|
||||
#[error("account storage staging file is invalid '{0}'")]
|
||||
InvalidAccountStorageStagingFile(PathBuf),
|
||||
|
||||
#[error("failed to create archive file '{1}': {0}")]
|
||||
CreateArchiveFile(#[source] IoError, PathBuf),
|
||||
|
||||
#[error("failed to archive version file: {0}")]
|
||||
ArchiveVersionFile(#[source] IoError),
|
||||
|
||||
#[error("failed to archive snapshots dir: {0}")]
|
||||
ArchiveSnapshotsDir(#[source] IoError),
|
||||
|
||||
#[error("failed to archive accounts dir: {0}")]
|
||||
ArchiveAccountsDir(#[source] IoError),
|
||||
|
||||
#[error("failed to archive snapshot: {0}")]
|
||||
FinishArchive(#[source] IoError),
|
||||
|
||||
#[error("failed to create encoder: {0}")]
|
||||
CreateEncoder(#[source] IoError),
|
||||
|
||||
#[error("failed to encode archive: {0}")]
|
||||
FinishEncoder(#[source] IoError),
|
||||
|
||||
#[error("failed to query archive metadata '{1}': {0}")]
|
||||
QueryArchiveMetadata(#[source] IoError, PathBuf),
|
||||
|
||||
#[error("failed to move archive from '{1}' to '{2}': {0}")]
|
||||
MoveArchive(#[source] IoError, PathBuf, PathBuf),
|
||||
}
|
||||
|
||||
/// Errors that can happen in `hard_link_storages_to_snapshot()`
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HardLinkStoragesToSnapshotError {
|
||||
|
@ -664,6 +731,9 @@ pub fn archive_snapshot_package(
|
|||
maximum_full_snapshot_archives_to_retain: NonZeroUsize,
|
||||
maximum_incremental_snapshot_archives_to_retain: NonZeroUsize,
|
||||
) -> Result<()> {
|
||||
use ArchiveSnapshotPackageError as E;
|
||||
const SNAPSHOTS_DIR: &str = "snapshots";
|
||||
const ACCOUNTS_DIR: &str = "accounts";
|
||||
info!(
|
||||
"Generating snapshot archive for slot {}",
|
||||
snapshot_package.slot()
|
||||
|
@ -675,8 +745,7 @@ pub fn archive_snapshot_package(
|
|||
.parent()
|
||||
.expect("Tar output path is invalid");
|
||||
|
||||
fs_err::create_dir_all(tar_dir)
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "create archive path"))?;
|
||||
fs::create_dir_all(tar_dir).map_err(|err| E::CreateArchiveDir(err, tar_dir.to_path_buf()))?;
|
||||
|
||||
// Create the staging directories
|
||||
let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
|
||||
|
@ -687,61 +756,65 @@ pub fn archive_snapshot_package(
|
|||
snapshot_package.slot()
|
||||
))
|
||||
.tempdir_in(tar_dir)
|
||||
.map_err(|e| SnapshotError::IoWithSource(e, "create archive tempdir"))?;
|
||||
.map_err(|err| E::CreateStagingDir(err, tar_dir.to_path_buf()))?;
|
||||
|
||||
let staging_accounts_dir = staging_dir.path().join("accounts");
|
||||
let staging_snapshots_dir = staging_dir.path().join("snapshots");
|
||||
let staging_snapshots_dir = staging_dir.path().join(SNAPSHOTS_DIR);
|
||||
let staging_accounts_dir = staging_dir.path().join(ACCOUNTS_DIR);
|
||||
|
||||
// Create staging/accounts/
|
||||
fs_err::create_dir_all(&staging_accounts_dir)
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "create staging accounts path"))?;
|
||||
fs::create_dir_all(&staging_accounts_dir)
|
||||
.map_err(|err| E::CreateAccountsStagingDir(err, staging_accounts_dir.clone()))?;
|
||||
|
||||
let slot_str = snapshot_package.slot().to_string();
|
||||
let staging_snapshot_dir = staging_snapshots_dir.join(&slot_str);
|
||||
// Creates staging snapshots/<slot>/
|
||||
fs_err::create_dir_all(&staging_snapshot_dir)
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "create staging snapshots path"))?;
|
||||
fs::create_dir_all(&staging_snapshot_dir)
|
||||
.map_err(|err| E::CreateSnapshotStagingDir(err, staging_snapshot_dir.clone()))?;
|
||||
|
||||
let src_snapshot_dir = &snapshot_package.bank_snapshot_dir;
|
||||
// To be a source for symlinking and archiving, the path need to be an absolute path
|
||||
let src_snapshot_dir = src_snapshot_dir
|
||||
.canonicalize()
|
||||
.map_err(|_e| SnapshotError::InvalidSnapshotDirPath(src_snapshot_dir.clone()))?;
|
||||
.map_err(|err| E::CanonicalizeSnapshotSourceDir(err, src_snapshot_dir.clone()))?;
|
||||
let staging_snapshot_file = staging_snapshot_dir.join(&slot_str);
|
||||
let src_snapshot_file = src_snapshot_dir.join(slot_str);
|
||||
symlink::symlink_file(src_snapshot_file, staging_snapshot_file)
|
||||
.map_err(|e| SnapshotError::IoWithSource(e, "create snapshot symlink"))?;
|
||||
symlink::symlink_file(&src_snapshot_file, &staging_snapshot_file)
|
||||
.map_err(|err| E::SymlinkSnapshot(err, src_snapshot_file, staging_snapshot_file))?;
|
||||
|
||||
// Following the existing archive format, the status cache is under snapshots/, not under <slot>/
|
||||
// like in the snapshot dir.
|
||||
let staging_status_cache = staging_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
let src_status_cache = src_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
symlink::symlink_file(src_status_cache, staging_status_cache)
|
||||
.map_err(|e| SnapshotError::IoWithSource(e, "create status cache symlink"))?;
|
||||
symlink::symlink_file(&src_status_cache, &staging_status_cache)
|
||||
.map_err(|err| E::SymlinkStatusCache(err, src_status_cache, staging_status_cache))?;
|
||||
|
||||
// The bank snapshot has the version file, so symlink it to the correct staging path
|
||||
let staging_version_file = staging_dir.path().join(SNAPSHOT_VERSION_FILENAME);
|
||||
let src_version_file = src_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
|
||||
symlink::symlink_file(src_version_file, staging_version_file)
|
||||
.map_err(|e| SnapshotError::IoWithSource(e, "create version file symlink"))?;
|
||||
symlink::symlink_file(&src_version_file, &staging_version_file).map_err(|err| {
|
||||
E::SymlinkVersionFile(err, src_version_file, staging_version_file.clone())
|
||||
})?;
|
||||
|
||||
// Add the AppendVecs into the compressible list
|
||||
for storage in snapshot_package.snapshot_storages.iter() {
|
||||
storage.flush()?;
|
||||
let storage_path = storage.get_path();
|
||||
let output_path = staging_accounts_dir.join(AppendVec::file_name(
|
||||
storage
|
||||
.flush()
|
||||
.map_err(|err| E::FlushAccountStorageFile(err, storage_path.clone()))?;
|
||||
let staging_storage_path = staging_accounts_dir.join(AppendVec::file_name(
|
||||
storage.slot(),
|
||||
storage.append_vec_id(),
|
||||
));
|
||||
|
||||
// `storage_path` - The file path where the AppendVec itself is located
|
||||
// `output_path` - The file path where the AppendVec will be placed in the staging directory.
|
||||
let storage_path =
|
||||
fs_err::canonicalize(storage_path).expect("Could not get absolute path for accounts");
|
||||
symlink::symlink_file(storage_path, &output_path)
|
||||
.map_err(|e| SnapshotError::IoWithSource(e, "create storage symlink"))?;
|
||||
if !output_path.is_file() {
|
||||
return Err(SnapshotError::StoragePathSymlinkInvalid(output_path));
|
||||
// `src_storage_path` - The file path where the AppendVec itself is located
|
||||
// `staging_storage_path` - The file path where the AppendVec will be placed in the staging directory.
|
||||
let src_storage_path = fs::canonicalize(&storage_path)
|
||||
.map_err(|err| E::CanonicalizeAccountStorageFile(err, storage_path))?;
|
||||
symlink::symlink_file(&src_storage_path, &staging_storage_path).map_err(|err| {
|
||||
E::SymlinkAccountStorageFile(err, src_storage_path, staging_storage_path.clone())
|
||||
})?;
|
||||
if !staging_storage_path.is_file() {
|
||||
return Err(E::InvalidAccountStorageStagingFile(staging_storage_path).into());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -754,20 +827,23 @@ pub fn archive_snapshot_package(
|
|||
));
|
||||
|
||||
{
|
||||
let mut archive_file = fs_err::File::create(&archive_path)?;
|
||||
let mut archive_file = fs::File::create(&archive_path)
|
||||
.map_err(|err| E::CreateArchiveFile(err, archive_path.clone()))?;
|
||||
|
||||
let do_archive_files = |encoder: &mut dyn Write| -> Result<()> {
|
||||
let do_archive_files = |encoder: &mut dyn Write| -> std::result::Result<(), E> {
|
||||
let mut archive = tar::Builder::new(encoder);
|
||||
// Serialize the version and snapshots files before accounts so we can quickly determine the version
|
||||
// and other bank fields. This is necessary if we want to interleave unpacking with reconstruction
|
||||
archive.append_path_with_name(
|
||||
staging_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME),
|
||||
SNAPSHOT_VERSION_FILENAME,
|
||||
)?;
|
||||
for dir in ["snapshots", "accounts"] {
|
||||
archive.append_dir_all(dir, staging_dir.as_ref().join(dir))?;
|
||||
}
|
||||
archive.into_inner()?;
|
||||
archive
|
||||
.append_path_with_name(&staging_version_file, SNAPSHOT_VERSION_FILENAME)
|
||||
.map_err(E::ArchiveVersionFile)?;
|
||||
archive
|
||||
.append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
|
||||
.map_err(E::ArchiveSnapshotsDir)?;
|
||||
archive
|
||||
.append_dir_all(ACCOUNTS_DIR, &staging_accounts_dir)
|
||||
.map_err(E::ArchiveAccountsDir)?;
|
||||
archive.into_inner().map_err(E::FinishArchive)?;
|
||||
Ok(())
|
||||
};
|
||||
|
||||
|
@ -776,24 +852,28 @@ pub fn archive_snapshot_package(
|
|||
let mut encoder =
|
||||
bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::best());
|
||||
do_archive_files(&mut encoder)?;
|
||||
encoder.finish()?;
|
||||
encoder.finish().map_err(E::FinishEncoder)?;
|
||||
}
|
||||
ArchiveFormat::TarGzip => {
|
||||
let mut encoder =
|
||||
flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
|
||||
do_archive_files(&mut encoder)?;
|
||||
encoder.finish()?;
|
||||
encoder.finish().map_err(E::FinishEncoder)?;
|
||||
}
|
||||
ArchiveFormat::TarZstd => {
|
||||
let mut encoder = zstd::stream::Encoder::new(archive_file, 0)?;
|
||||
let mut encoder =
|
||||
zstd::stream::Encoder::new(archive_file, 0).map_err(E::CreateEncoder)?;
|
||||
do_archive_files(&mut encoder)?;
|
||||
encoder.finish()?;
|
||||
encoder.finish().map_err(E::FinishEncoder)?;
|
||||
}
|
||||
ArchiveFormat::TarLz4 => {
|
||||
let mut encoder = lz4::EncoderBuilder::new().level(1).build(archive_file)?;
|
||||
let mut encoder = lz4::EncoderBuilder::new()
|
||||
.level(1)
|
||||
.build(archive_file)
|
||||
.map_err(E::CreateEncoder)?;
|
||||
do_archive_files(&mut encoder)?;
|
||||
let (_output, result) = encoder.finish();
|
||||
result?
|
||||
result.map_err(E::FinishEncoder)?;
|
||||
}
|
||||
ArchiveFormat::Tar => {
|
||||
do_archive_files(&mut archive_file)?;
|
||||
|
@ -802,10 +882,10 @@ pub fn archive_snapshot_package(
|
|||
}
|
||||
|
||||
// Atomically move the archive into position for other validators to find
|
||||
let metadata = fs_err::metadata(&archive_path)
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "archive path stat"))?;
|
||||
fs_err::rename(&archive_path, snapshot_package.path())
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "archive path rename"))?;
|
||||
let metadata = fs::metadata(&archive_path)
|
||||
.map_err(|err| E::QueryArchiveMetadata(err, archive_path.clone()))?;
|
||||
fs::rename(&archive_path, snapshot_package.path())
|
||||
.map_err(|err| E::MoveArchive(err, archive_path, snapshot_package.path().clone()))?;
|
||||
|
||||
purge_old_snapshot_archives(
|
||||
full_snapshot_archives_dir,
|
||||
|
@ -816,8 +896,8 @@ pub fn archive_snapshot_package(
|
|||
|
||||
timer.stop();
|
||||
info!(
|
||||
"Successfully created {:?}. slot: {}, elapsed ms: {}, size={}",
|
||||
snapshot_package.path(),
|
||||
"Successfully created {}. slot: {}, elapsed ms: {}, size: {}",
|
||||
snapshot_package.path().display(),
|
||||
snapshot_package.slot(),
|
||||
timer.as_ms(),
|
||||
metadata.len()
|
||||
|
|
Loading…
Reference in New Issue