Adds error types for all of `add_bank_snapshot()` (#32564)
This commit is contained in:
parent
d73fa1b590
commit
faa002c2b6
|
@ -379,6 +379,9 @@ pub enum SnapshotError {
|
|||
|
||||
#[error("snapshot dir account paths mismatching")]
|
||||
AccountPathsMismatch,
|
||||
|
||||
#[error("failed to add bank snapshot for slot {1}: {0}")]
|
||||
AddBankSnapshot(#[source] AddBankSnapshotError, Slot),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
|
@ -432,6 +435,64 @@ pub enum VerifySlotDeltasError {
|
|||
BadSlotHistory,
|
||||
}
|
||||
|
||||
/// Errors that can happen in `add_bank_snapshot()`
|
||||
#[derive(Error, Debug)]
|
||||
pub enum AddBankSnapshotError {
|
||||
#[error("bank snapshot dir already exists: {0}")]
|
||||
SnapshotDirAlreadyExists(PathBuf),
|
||||
|
||||
#[error("failed to create snapshot dir: {0}")]
|
||||
CreateSnapshotDir(#[source] std::io::Error),
|
||||
|
||||
#[error("failed to hard link storages: {0}")]
|
||||
HardLinkStorages(#[source] HardLinkStoragesToSnapshotError),
|
||||
|
||||
#[error("failed to serialize bank: {0}")]
|
||||
SerializeBank(#[source] Box<SnapshotError>),
|
||||
|
||||
#[error("failed to serialize status cache: {0}")]
|
||||
SerializeStatusCache(#[source] Box<SnapshotError>),
|
||||
|
||||
#[error("failed to write snapshot version file: {0}")]
|
||||
WriteSnapshotVersionFile(#[source] std::io::Error),
|
||||
|
||||
#[error("failed to mark snapshot as 'complete': {0}")]
|
||||
CreateStateCompleteFile(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
/// Errors that can happen in `hard_link_storages_to_snapshot()`
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HardLinkStoragesToSnapshotError {
|
||||
#[error("failed to create accounts hard links dir: {0}")]
|
||||
CreateAccountsHardLinksDir(#[source] std::io::Error),
|
||||
|
||||
#[error("failed to flush storage: {0}")]
|
||||
FlushStorage(#[source] AccountsFileError),
|
||||
|
||||
#[error("failed to get the snapshot's accounts hard link dir: {0}")]
|
||||
GetSnapshotHardLinksDir(#[from] GetSnapshotAccountsHardLinkDirError),
|
||||
|
||||
#[error("failed to hard link storage: {0}")]
|
||||
HardLinkStorage(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
/// Errors that can happen in `get_snapshot_accounts_hardlink_dir()`
|
||||
#[derive(Error, Debug)]
|
||||
pub enum GetSnapshotAccountsHardLinkDirError {
|
||||
#[error("invalid account storage path: {0}")]
|
||||
GetAccountPath(PathBuf),
|
||||
|
||||
#[error("failed to create the snapshot hard link dir: {0}")]
|
||||
CreateSnapshotHardLinkDir(#[source] std::io::Error),
|
||||
|
||||
#[error("failed to symlink snapshot hard link dir {link} to {original}: {source}")]
|
||||
SymlinkSnapshotHardLinkDir {
|
||||
source: std::io::Error,
|
||||
original: PathBuf,
|
||||
link: PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
/// Creates directories if they do not exist, and canonicalizes the paths.
|
||||
pub fn create_and_canonicalize_directories(directories: &[PathBuf]) -> Result<Vec<PathBuf>> {
|
||||
directories
|
||||
|
@ -647,9 +708,8 @@ pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
|
|||
pub fn write_snapshot_version_file(
|
||||
version_file: impl AsRef<Path>,
|
||||
version: SnapshotVersion,
|
||||
) -> Result<()> {
|
||||
) -> std::io::Result<()> {
|
||||
fs_err::write(version_file, version.as_str().as_bytes())
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "write snapshot version file"))
|
||||
}
|
||||
|
||||
/// Make a snapshot archive out of the snapshot package
|
||||
|
@ -736,7 +796,8 @@ pub fn archive_snapshot_package(
|
|||
}
|
||||
}
|
||||
|
||||
write_snapshot_version_file(staging_version_file, snapshot_package.snapshot_version)?;
|
||||
write_snapshot_version_file(staging_version_file, snapshot_package.snapshot_version)
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "write snapshot version file"))?;
|
||||
|
||||
// Tar the staging directory into the archive at `archive_path`
|
||||
let archive_path = tar_dir.join(format!(
|
||||
|
@ -1161,9 +1222,10 @@ fn get_snapshot_accounts_hardlink_dir(
|
|||
bank_slot: Slot,
|
||||
account_paths: &mut HashSet<PathBuf>,
|
||||
hardlinks_dir: impl AsRef<Path>,
|
||||
) -> Result<PathBuf> {
|
||||
let account_path = get_account_path_from_appendvec_path(appendvec_path)
|
||||
.ok_or_else(|| SnapshotError::InvalidAppendVecPath(appendvec_path.to_path_buf()))?;
|
||||
) -> std::result::Result<PathBuf, GetSnapshotAccountsHardLinkDirError> {
|
||||
let account_path = get_account_path_from_appendvec_path(appendvec_path).ok_or_else(|| {
|
||||
GetSnapshotAccountsHardLinkDirError::GetAccountPath(appendvec_path.to_path_buf())
|
||||
})?;
|
||||
|
||||
let snapshot_hardlink_dir = account_path.join("snapshot").join(bank_slot.to_string());
|
||||
|
||||
|
@ -1177,14 +1239,14 @@ fn get_snapshot_accounts_hardlink_dir(
|
|||
snapshot_hardlink_dir.display()
|
||||
);
|
||||
fs_err::create_dir_all(&snapshot_hardlink_dir)
|
||||
.map_err(|err| SnapshotError::IoWithSource(err, "create hard-link dir"))?;
|
||||
.map_err(GetSnapshotAccountsHardLinkDirError::CreateSnapshotHardLinkDir)?;
|
||||
let symlink_path = hardlinks_dir.as_ref().join(format!("account_path_{idx}"));
|
||||
symlink::symlink_dir(&snapshot_hardlink_dir, symlink_path).map_err(|err| {
|
||||
SnapshotError::IoWithSourceAndFile(
|
||||
err,
|
||||
"symlink the hard-link dir",
|
||||
snapshot_hardlink_dir.clone(),
|
||||
)
|
||||
symlink::symlink_dir(&snapshot_hardlink_dir, &symlink_path).map_err(|err| {
|
||||
GetSnapshotAccountsHardLinkDirError::SymlinkSnapshotHardLinkDir {
|
||||
source: err,
|
||||
original: snapshot_hardlink_dir.clone(),
|
||||
link: symlink_path,
|
||||
}
|
||||
})?;
|
||||
account_paths.insert(account_path);
|
||||
};
|
||||
|
@ -1200,13 +1262,16 @@ fn hard_link_storages_to_snapshot(
|
|||
bank_snapshot_dir: impl AsRef<Path>,
|
||||
bank_slot: Slot,
|
||||
snapshot_storages: &[Arc<AccountStorageEntry>],
|
||||
) -> Result<()> {
|
||||
) -> std::result::Result<(), HardLinkStoragesToSnapshotError> {
|
||||
let accounts_hardlinks_dir = bank_snapshot_dir.as_ref().join(SNAPSHOT_ACCOUNTS_HARDLINKS);
|
||||
fs_err::create_dir_all(&accounts_hardlinks_dir)?;
|
||||
fs_err::create_dir_all(&accounts_hardlinks_dir)
|
||||
.map_err(HardLinkStoragesToSnapshotError::CreateAccountsHardLinksDir)?;
|
||||
|
||||
let mut account_paths: HashSet<PathBuf> = HashSet::new();
|
||||
for storage in snapshot_storages {
|
||||
storage.flush()?;
|
||||
storage
|
||||
.flush()
|
||||
.map_err(HardLinkStoragesToSnapshotError::FlushStorage)?;
|
||||
let storage_path = storage.accounts.get_path();
|
||||
let snapshot_hardlink_dir = get_snapshot_accounts_hardlink_dir(
|
||||
&storage_path,
|
||||
|
@ -1218,7 +1283,8 @@ fn hard_link_storages_to_snapshot(
|
|||
// Use the storage slot and id to compose a consistent file name for the hard-link file.
|
||||
let hardlink_filename = AppendVec::file_name(storage.slot(), storage.append_vec_id());
|
||||
let hard_link_path = snapshot_hardlink_dir.join(hardlink_filename);
|
||||
fs_err::hard_link(&storage_path, &hard_link_path)?;
|
||||
fs_err::hard_link(&storage_path, &hard_link_path)
|
||||
.map_err(HardLinkStoragesToSnapshotError::HardLinkStorage)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1236,90 +1302,102 @@ pub fn add_bank_snapshot(
|
|||
snapshot_version: SnapshotVersion,
|
||||
slot_deltas: Vec<BankSlotDelta>,
|
||||
) -> Result<BankSnapshotInfo> {
|
||||
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
|
||||
let slot = bank.slot();
|
||||
let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
|
||||
assert!(
|
||||
!bank_snapshot_dir.exists(),
|
||||
"A bank snapshot already exists for slot {slot}!? Path: {}",
|
||||
bank_snapshot_dir.display()
|
||||
);
|
||||
fs_err::create_dir_all(&bank_snapshot_dir)?;
|
||||
// this lambda function is to facilitate converting between
|
||||
// the AddBankSnapshotError and SnapshotError types
|
||||
let do_add_bank_snapshot = || {
|
||||
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
|
||||
let slot = bank.slot();
|
||||
let bank_snapshot_dir = get_bank_snapshot_dir(&bank_snapshots_dir, slot);
|
||||
if bank_snapshot_dir.exists() {
|
||||
return Err(AddBankSnapshotError::SnapshotDirAlreadyExists(
|
||||
bank_snapshot_dir,
|
||||
));
|
||||
}
|
||||
fs_err::create_dir_all(&bank_snapshot_dir)
|
||||
.map_err(AddBankSnapshotError::CreateSnapshotDir)?;
|
||||
|
||||
// the bank snapshot is stored as bank_snapshots_dir/slot/slot.BANK_SNAPSHOT_PRE_FILENAME_EXTENSION
|
||||
let bank_snapshot_path = bank_snapshot_dir
|
||||
.join(get_snapshot_file_name(slot))
|
||||
.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
||||
// the bank snapshot is stored as bank_snapshots_dir/slot/slot.BANK_SNAPSHOT_PRE_FILENAME_EXTENSION
|
||||
let bank_snapshot_path = bank_snapshot_dir
|
||||
.join(get_snapshot_file_name(slot))
|
||||
.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
||||
|
||||
info!(
|
||||
"Creating bank snapshot for slot {}, path: {}",
|
||||
slot,
|
||||
bank_snapshot_path.display(),
|
||||
);
|
||||
info!(
|
||||
"Creating bank snapshot for slot {}, path: {}",
|
||||
slot,
|
||||
bank_snapshot_path.display(),
|
||||
);
|
||||
|
||||
// We are constructing the snapshot directory to contain the full snapshot state information to allow
|
||||
// constructing a bank from this directory. It acts like an archive to include the full state.
|
||||
// The set of the account storages files is the necessary part of this snapshot state. Hard-link them
|
||||
// from the operational accounts/ directory to here.
|
||||
hard_link_storages_to_snapshot(&bank_snapshot_dir, slot, snapshot_storages)?;
|
||||
// We are constructing the snapshot directory to contain the full snapshot state information to allow
|
||||
// constructing a bank from this directory. It acts like an archive to include the full state.
|
||||
// The set of the account storages files is the necessary part of this snapshot state. Hard-link them
|
||||
// from the operational accounts/ directory to here.
|
||||
hard_link_storages_to_snapshot(&bank_snapshot_dir, slot, snapshot_storages)
|
||||
.map_err(AddBankSnapshotError::HardLinkStorages)?;
|
||||
|
||||
let bank_snapshot_serializer = move |stream: &mut BufWriter<std::fs::File>| -> Result<()> {
|
||||
let serde_style = match snapshot_version {
|
||||
SnapshotVersion::V1_2_0 => SerdeStyle::Newer,
|
||||
let bank_snapshot_serializer = move |stream: &mut BufWriter<std::fs::File>| -> Result<()> {
|
||||
let serde_style = match snapshot_version {
|
||||
SnapshotVersion::V1_2_0 => SerdeStyle::Newer,
|
||||
};
|
||||
bank_to_stream(
|
||||
serde_style,
|
||||
stream.by_ref(),
|
||||
bank,
|
||||
&get_storages_to_serialize(snapshot_storages),
|
||||
)?;
|
||||
Ok(())
|
||||
};
|
||||
bank_to_stream(
|
||||
serde_style,
|
||||
stream.by_ref(),
|
||||
bank,
|
||||
&get_storages_to_serialize(snapshot_storages),
|
||||
)?;
|
||||
Ok(())
|
||||
let (bank_snapshot_consumed_size, bank_serialize) = measure!(
|
||||
serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)
|
||||
.map_err(|err| AddBankSnapshotError::SerializeBank(Box::new(err)))?,
|
||||
"bank serialize"
|
||||
);
|
||||
add_snapshot_time.stop();
|
||||
|
||||
let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
let (status_cache_consumed_size, status_cache_serialize) =
|
||||
measure!(serialize_status_cache(&slot_deltas, &status_cache_path)
|
||||
.map_err(|err| AddBankSnapshotError::SerializeStatusCache(Box::new(err)))?);
|
||||
|
||||
let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
|
||||
write_snapshot_version_file(version_path, snapshot_version)
|
||||
.map_err(AddBankSnapshotError::WriteSnapshotVersionFile)?;
|
||||
|
||||
// Mark this directory complete so it can be used. Check this flag first before selecting for deserialization.
|
||||
let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
|
||||
fs_err::File::create(state_complete_path)
|
||||
.map_err(AddBankSnapshotError::CreateStateCompleteFile)?;
|
||||
|
||||
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
|
||||
datapoint_info!(
|
||||
"snapshot-bank-file",
|
||||
("slot", slot, i64),
|
||||
("bank_size", bank_snapshot_consumed_size, i64),
|
||||
("status_cache_size", status_cache_consumed_size, i64),
|
||||
("bank_serialize_ms", bank_serialize.as_ms(), i64),
|
||||
("add_snapshot_ms", add_snapshot_time.as_ms(), i64),
|
||||
(
|
||||
"status_cache_serialize_ms",
|
||||
status_cache_serialize.as_ms(),
|
||||
i64
|
||||
),
|
||||
);
|
||||
|
||||
info!(
|
||||
"{} for slot {} at {}",
|
||||
bank_serialize,
|
||||
slot,
|
||||
bank_snapshot_path.display(),
|
||||
);
|
||||
|
||||
Ok(BankSnapshotInfo {
|
||||
slot,
|
||||
snapshot_type: BankSnapshotType::Pre,
|
||||
snapshot_dir: bank_snapshot_dir,
|
||||
snapshot_version,
|
||||
})
|
||||
};
|
||||
let (bank_snapshot_consumed_size, bank_serialize) = measure!(
|
||||
serialize_snapshot_data_file(&bank_snapshot_path, bank_snapshot_serializer)?,
|
||||
"bank serialize"
|
||||
);
|
||||
add_snapshot_time.stop();
|
||||
|
||||
let status_cache_path = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
let (status_cache_consumed_size, status_cache_serialize) =
|
||||
measure!(serialize_status_cache(&slot_deltas, &status_cache_path)?);
|
||||
|
||||
let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
|
||||
write_snapshot_version_file(version_path, snapshot_version)?;
|
||||
|
||||
// Mark this directory complete so it can be used. Check this flag first before selecting for deserialization.
|
||||
let state_complete_path = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
|
||||
fs_err::File::create(state_complete_path)?;
|
||||
|
||||
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
|
||||
datapoint_info!(
|
||||
"snapshot-bank-file",
|
||||
("slot", slot, i64),
|
||||
("bank_size", bank_snapshot_consumed_size, i64),
|
||||
("status_cache_size", status_cache_consumed_size, i64),
|
||||
("bank_serialize_ms", bank_serialize.as_ms(), i64),
|
||||
("add_snapshot_ms", add_snapshot_time.as_ms(), i64),
|
||||
(
|
||||
"status_cache_serialize_ms",
|
||||
status_cache_serialize.as_ms(),
|
||||
i64
|
||||
),
|
||||
);
|
||||
|
||||
info!(
|
||||
"{} for slot {} at {}",
|
||||
bank_serialize,
|
||||
slot,
|
||||
bank_snapshot_path.display(),
|
||||
);
|
||||
|
||||
Ok(BankSnapshotInfo {
|
||||
slot,
|
||||
snapshot_type: BankSnapshotType::Pre,
|
||||
snapshot_dir: bank_snapshot_dir,
|
||||
snapshot_version,
|
||||
})
|
||||
do_add_bank_snapshot().map_err(|err| SnapshotError::AddBankSnapshot(err, bank.slot()))
|
||||
}
|
||||
|
||||
/// serializing needs Vec<Vec<Arc<AccountStorageEntry>>>, but data structure at runtime is Vec<Arc<AccountStorageEntry>>
|
||||
|
@ -5178,7 +5256,10 @@ mod tests {
|
|||
accounts_hardlinks_dir,
|
||||
);
|
||||
|
||||
assert!(matches!(ret, Err(SnapshotError::InvalidAppendVecPath(_))));
|
||||
assert!(matches!(
|
||||
ret,
|
||||
Err(GetSnapshotAccountsHardLinkDirError::GetAccountPath(_))
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue