Add checks when constructing a BankSnapshotInfo from a directory (#30373)
* Read snapshot directories and find the highest * format check * Fix test_get_bank_snapshots * fix test_bank_fields_from_snapshot * review changes, is_file etc * removed incremental snapshot * SnapshotNewFromDirError * nit and comments issues * NewFromDir(#[from] SnapshotNewFromDirError) * change fill to create * replace unwrap with map_err and ok_or_else * Remove BankForks, fix the bank loop
This commit is contained in:
parent
e0abad43c3
commit
510be7bdb5
|
@ -143,6 +143,8 @@ pub struct BankSnapshotInfo {
|
|||
pub snapshot_type: BankSnapshotType,
|
||||
/// Path to the bank snapshot directory
|
||||
pub snapshot_dir: PathBuf,
|
||||
/// Snapshot version
|
||||
pub snapshot_version: SnapshotVersion,
|
||||
}
|
||||
|
||||
impl PartialOrd for BankSnapshotInfo {
|
||||
|
@ -162,31 +164,58 @@ impl BankSnapshotInfo {
|
|||
pub fn new_from_dir(
|
||||
bank_snapshots_dir: impl AsRef<Path>,
|
||||
slot: Slot,
|
||||
) -> Option<BankSnapshotInfo> {
|
||||
) -> std::result::Result<BankSnapshotInfo, SnapshotNewFromDirError> {
|
||||
// check this directory to see if there is a BankSnapshotPre and/or
|
||||
// BankSnapshotPost file
|
||||
let bank_snapshot_dir = get_bank_snapshots_dir(&bank_snapshots_dir, slot);
|
||||
|
||||
if !bank_snapshot_dir.is_dir() {
|
||||
return Err(SnapshotNewFromDirError::InvalidBankSnapshotDir(
|
||||
bank_snapshot_dir,
|
||||
));
|
||||
}
|
||||
|
||||
let status_cache_file = bank_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
if !fs::metadata(&status_cache_file)?.is_file() {
|
||||
return Err(SnapshotNewFromDirError::MissingStatusCacheFile(
|
||||
status_cache_file,
|
||||
));
|
||||
}
|
||||
|
||||
let version_path = bank_snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
|
||||
let version_str = snapshot_version_from_file(&version_path).or(Err(
|
||||
SnapshotNewFromDirError::MissingVersionFile(version_path),
|
||||
))?;
|
||||
let snapshot_version = SnapshotVersion::from_str(version_str.as_str())
|
||||
.or(Err(SnapshotNewFromDirError::InvalidVersion))?;
|
||||
|
||||
// There is a time window from the slot directory being created, and the content being completely
|
||||
// filled. Check the completion to avoid using a highest found slot directory with missing content.
|
||||
let completion_flag_file = bank_snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
|
||||
if !fs::metadata(&completion_flag_file)?.is_file() {
|
||||
return Err(SnapshotNewFromDirError::IncompleteDir(completion_flag_file));
|
||||
}
|
||||
|
||||
let bank_snapshot_post_path = bank_snapshot_dir.join(get_snapshot_file_name(slot));
|
||||
let bank_snapshot_pre_path =
|
||||
bank_snapshot_post_path.with_extension(BANK_SNAPSHOT_PRE_FILENAME_EXTENSION);
|
||||
|
||||
if bank_snapshot_pre_path.is_file() {
|
||||
return Some(BankSnapshotInfo {
|
||||
slot,
|
||||
snapshot_type: BankSnapshotType::Pre,
|
||||
snapshot_dir: bank_snapshot_dir,
|
||||
});
|
||||
}
|
||||
let snapshot_type = if bank_snapshot_pre_path.is_file() {
|
||||
BankSnapshotType::Pre
|
||||
} else if bank_snapshot_post_path.is_file() {
|
||||
BankSnapshotType::Post
|
||||
} else {
|
||||
return Err(SnapshotNewFromDirError::MissingSnapshotFile(
|
||||
bank_snapshot_dir,
|
||||
));
|
||||
};
|
||||
|
||||
if bank_snapshot_post_path.is_file() {
|
||||
return Some(BankSnapshotInfo {
|
||||
slot,
|
||||
snapshot_type: BankSnapshotType::Post,
|
||||
snapshot_dir: bank_snapshot_dir,
|
||||
});
|
||||
}
|
||||
|
||||
None
|
||||
Ok(BankSnapshotInfo {
|
||||
slot,
|
||||
snapshot_type,
|
||||
snapshot_dir: bank_snapshot_dir,
|
||||
snapshot_version,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn snapshot_path(&self) -> PathBuf {
|
||||
|
@ -294,12 +323,42 @@ pub enum SnapshotError {
|
|||
#[error("snapshot slot deltas are invalid: {0}")]
|
||||
VerifySlotDeltas(#[from] VerifySlotDeltasError),
|
||||
|
||||
#[error("bank_snapshot_info new_from_dir failed: {0}")]
|
||||
NewFromDir(#[from] SnapshotNewFromDirError),
|
||||
|
||||
#[error("invalid AppendVec path: {}", .0.display())]
|
||||
InvalidAppendVecPath(PathBuf),
|
||||
|
||||
#[error("invalid account path: {}", .0.display())]
|
||||
InvalidAccountPath(PathBuf),
|
||||
|
||||
#[error("no valid snapshot dir found under {}", .0.display())]
|
||||
NoSnapshotSlotDir(PathBuf),
|
||||
}
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SnapshotNewFromDirError {
|
||||
#[error("I/O error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
#[error("invalid bank snapshot directory {}", .0.display())]
|
||||
InvalidBankSnapshotDir(PathBuf),
|
||||
|
||||
#[error("missing status cache file {}", .0.display())]
|
||||
MissingStatusCacheFile(PathBuf),
|
||||
|
||||
#[error("missing version file {}", .0.display())]
|
||||
MissingVersionFile(PathBuf),
|
||||
|
||||
#[error("invalid snapshot version")]
|
||||
InvalidVersion,
|
||||
|
||||
#[error("snapshot directory incomplete {}", .0.display())]
|
||||
IncompleteDir(PathBuf),
|
||||
|
||||
#[error("missing snapshot file {}", .0.display())]
|
||||
MissingSnapshotFile(PathBuf),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, SnapshotError>;
|
||||
|
||||
/// Errors that can happen in `verify_slot_deltas()`
|
||||
|
@ -644,13 +703,16 @@ pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnaps
|
|||
.and_then(|file_name| file_name.parse::<Slot>().ok())
|
||||
})
|
||||
})
|
||||
.for_each(|slot| {
|
||||
if let Some(snapshot_info) =
|
||||
BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot)
|
||||
{
|
||||
bank_snapshots.push(snapshot_info);
|
||||
}
|
||||
}),
|
||||
.for_each(
|
||||
|slot| match BankSnapshotInfo::new_from_dir(&bank_snapshots_dir, slot) {
|
||||
Ok(snapshot_info) => {
|
||||
bank_snapshots.push(snapshot_info);
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Unable to read bank snapshot for slot {}: {}", slot, err);
|
||||
}
|
||||
},
|
||||
),
|
||||
}
|
||||
bank_snapshots
|
||||
}
|
||||
|
@ -691,6 +753,13 @@ pub fn get_highest_bank_snapshot_post(
|
|||
do_get_highest_bank_snapshot(get_bank_snapshots_post(bank_snapshots_dir))
|
||||
}
|
||||
|
||||
/// Get the bank snapshot with the highest slot in a directory
|
||||
///
|
||||
/// This function gets the highest bank snapshot of any type
|
||||
pub fn get_highest_bank_snapshot(bank_snapshots_dir: impl AsRef<Path>) -> Option<BankSnapshotInfo> {
|
||||
do_get_highest_bank_snapshot(get_bank_snapshots(&bank_snapshots_dir))
|
||||
}
|
||||
|
||||
fn do_get_highest_bank_snapshot(
|
||||
mut bank_snapshots: Vec<BankSnapshotInfo>,
|
||||
) -> Option<BankSnapshotInfo> {
|
||||
|
@ -1102,6 +1171,7 @@ pub fn add_bank_snapshot(
|
|||
slot,
|
||||
snapshot_type: BankSnapshotType::Pre,
|
||||
snapshot_dir: bank_snapshot_dir,
|
||||
snapshot_version,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -1548,6 +1618,39 @@ fn streaming_unarchive_snapshot(
|
|||
.collect()
|
||||
}
|
||||
|
||||
/// BankSnapshotInfo::new_from_dir() requires a few meta files to accept a snapshot dir
|
||||
/// as a valid one. A dir unpacked from an archive lacks these files. Fill them here to
|
||||
/// allow new_from_dir() checks to pass. These checks are not needed for unpacked dirs,
|
||||
/// but it is not clean to add another flag to new_from_dir() to skip them.
|
||||
fn create_snapshot_meta_files_for_unarchived_snapshot(unpack_dir: impl AsRef<Path>) -> Result<()> {
|
||||
let snapshots_dir = unpack_dir.as_ref().join("snapshots");
|
||||
if !snapshots_dir.is_dir() {
|
||||
return Err(SnapshotError::NoSnapshotSlotDir(snapshots_dir));
|
||||
}
|
||||
|
||||
// The unpacked dir has a single slot dir, which is the snapshot slot dir.
|
||||
let slot_dir = fs::read_dir(&snapshots_dir)
|
||||
.map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
|
||||
.find(|entry| entry.as_ref().unwrap().path().is_dir())
|
||||
.ok_or_else(|| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
|
||||
.map_err(|_| SnapshotError::NoSnapshotSlotDir(snapshots_dir.clone()))?
|
||||
.path();
|
||||
|
||||
let version_file = unpack_dir.as_ref().join(SNAPSHOT_VERSION_FILENAME);
|
||||
fs::hard_link(version_file, slot_dir.join(SNAPSHOT_VERSION_FILENAME))?;
|
||||
|
||||
let status_cache_file = snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
fs::hard_link(
|
||||
status_cache_file,
|
||||
slot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME),
|
||||
)?;
|
||||
|
||||
let state_complete_file = slot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
|
||||
fs::File::create(state_complete_file)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform the common tasks when unarchiving a snapshot. Handles creating the temporary
|
||||
/// directories, untaring, reading the version file, and then returning those fields plus the
|
||||
/// rebuilt storage
|
||||
|
@ -1593,6 +1696,8 @@ where
|
|||
);
|
||||
info!("{}", measure_untar);
|
||||
|
||||
create_snapshot_meta_files_for_unarchived_snapshot(&unpack_dir)?;
|
||||
|
||||
let RebuiltSnapshotStorage {
|
||||
snapshot_version,
|
||||
storage,
|
||||
|
@ -3069,6 +3174,16 @@ mod tests {
|
|||
let snapshot_filename = get_snapshot_file_name(slot);
|
||||
let snapshot_path = snapshot_dir.join(snapshot_filename);
|
||||
File::create(snapshot_path).unwrap();
|
||||
|
||||
let status_cache_file = snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
File::create(status_cache_file).unwrap();
|
||||
|
||||
let version_path = snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
|
||||
write_snapshot_version_file(version_path, SnapshotVersion::default()).unwrap();
|
||||
|
||||
// Mark this directory complete so it can be used. Check this flag first before selecting for deserialization.
|
||||
let state_complete_path = snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
|
||||
fs::File::create(state_complete_path).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4522,4 +4637,55 @@ mod tests {
|
|||
|
||||
assert!(matches!(ret, Err(SnapshotError::InvalidAppendVecPath(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_highest_bank_snapshot() {
|
||||
solana_logger::setup();
|
||||
|
||||
let genesis_config = GenesisConfig::default();
|
||||
let mut bank = Arc::new(Bank::new_for_tests(&genesis_config));
|
||||
|
||||
let tmp_dir = tempfile::TempDir::new().unwrap();
|
||||
let bank_snapshots_dir = tmp_dir.path();
|
||||
let collecter_id = Pubkey::new_unique();
|
||||
let snapshot_version = SnapshotVersion::default();
|
||||
|
||||
for _ in 0..4 {
|
||||
// prepare the bank
|
||||
bank = Arc::new(Bank::new_from_parent(&bank, &collecter_id, bank.slot() + 1));
|
||||
bank.fill_bank_with_ticks_for_tests();
|
||||
bank.squash();
|
||||
bank.force_flush_accounts_cache();
|
||||
|
||||
// generate the bank snapshot directory for slot+1
|
||||
let snapshot_storages = bank.get_snapshot_storages(None);
|
||||
let slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
|
||||
add_bank_snapshot(
|
||||
bank_snapshots_dir,
|
||||
&bank,
|
||||
&snapshot_storages,
|
||||
snapshot_version,
|
||||
slot_deltas,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
|
||||
assert_eq!(snapshot.slot, 4);
|
||||
|
||||
let complete_flag_file = snapshot.snapshot_dir.join(SNAPSHOT_STATE_COMPLETE_FILENAME);
|
||||
fs::remove_file(complete_flag_file).unwrap();
|
||||
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
|
||||
assert_eq!(snapshot.slot, 3);
|
||||
|
||||
let snapshot_version_file = snapshot.snapshot_dir.join(SNAPSHOT_VERSION_FILENAME);
|
||||
fs::remove_file(snapshot_version_file).unwrap();
|
||||
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
|
||||
assert_eq!(snapshot.slot, 2);
|
||||
|
||||
let status_cache_file = snapshot.snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILENAME);
|
||||
fs::remove_file(status_cache_file).unwrap();
|
||||
let snapshot = get_highest_bank_snapshot(bank_snapshots_dir).unwrap();
|
||||
assert_eq!(snapshot.slot, 1);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue