Refactor SnapshotError related code (#31632)
#### Problem Part of the snapshot-related code uses io::Error while other parts use SnapshotError. #### Summary of Changes As SnapshotError can be created from io::Error, this PR makes snapshot-related code to use SnapshotError instead.
This commit is contained in:
parent
c79a6e74e2
commit
586fd407bf
|
@ -18,7 +18,9 @@ use {
|
||||||
rent_collector::RentCollector,
|
rent_collector::RentCollector,
|
||||||
runtime_config::RuntimeConfig,
|
runtime_config::RuntimeConfig,
|
||||||
serde_snapshot::storage::SerializableAccountStorageEntry,
|
serde_snapshot::storage::SerializableAccountStorageEntry,
|
||||||
snapshot_utils::{self, StorageAndNextAppendVecId, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION},
|
snapshot_utils::{
|
||||||
|
self, SnapshotError, StorageAndNextAppendVecId, BANK_SNAPSHOT_PRE_FILENAME_EXTENSION,
|
||||||
|
},
|
||||||
stakes::Stakes,
|
stakes::Stakes,
|
||||||
},
|
},
|
||||||
bincode::{self, config::Options, Error},
|
bincode::{self, config::Options, Error},
|
||||||
|
@ -626,7 +628,7 @@ pub(crate) fn reconstruct_single_storage(
|
||||||
append_vec_path: &Path,
|
append_vec_path: &Path,
|
||||||
current_len: usize,
|
current_len: usize,
|
||||||
append_vec_id: AppendVecId,
|
append_vec_id: AppendVecId,
|
||||||
) -> io::Result<Arc<AccountStorageEntry>> {
|
) -> Result<Arc<AccountStorageEntry>, SnapshotError> {
|
||||||
let (accounts_file, num_accounts) =
|
let (accounts_file, num_accounts) =
|
||||||
AccountsFile::new_from_file(append_vec_path, current_len)
|
AccountsFile::new_from_file(append_vec_path, current_len)
|
||||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{}", err)))?;
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{}", err)))?;
|
||||||
|
@ -684,7 +686,7 @@ pub(crate) fn remap_and_reconstruct_single_storage(
|
||||||
append_vec_path: &Path,
|
append_vec_path: &Path,
|
||||||
next_append_vec_id: &AtomicAppendVecId,
|
next_append_vec_id: &AtomicAppendVecId,
|
||||||
num_collisions: &AtomicUsize,
|
num_collisions: &AtomicUsize,
|
||||||
) -> io::Result<Arc<AccountStorageEntry>> {
|
) -> Result<Arc<AccountStorageEntry>, SnapshotError> {
|
||||||
let (remapped_append_vec_id, remapped_append_vec_path) = remap_append_vec_file(
|
let (remapped_append_vec_id, remapped_append_vec_path) = remap_append_vec_file(
|
||||||
slot,
|
slot,
|
||||||
old_append_vec_id,
|
old_append_vec_id,
|
||||||
|
|
|
@ -99,8 +99,7 @@ impl SnapshotStorageRebuilder {
|
||||||
snapshot_storage_lengths,
|
snapshot_storage_lengths,
|
||||||
append_vec_files,
|
append_vec_files,
|
||||||
snapshot_from,
|
snapshot_from,
|
||||||
)
|
)?;
|
||||||
.map_err(|err| SnapshotError::IoWithSource(err, "rebuild snapshot storages"))?;
|
|
||||||
|
|
||||||
Ok(RebuiltSnapshotStorage {
|
Ok(RebuiltSnapshotStorage {
|
||||||
snapshot_version,
|
snapshot_version,
|
||||||
|
@ -207,7 +206,7 @@ impl SnapshotStorageRebuilder {
|
||||||
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
|
snapshot_storage_lengths: HashMap<Slot, HashMap<usize, usize>>,
|
||||||
append_vec_files: Vec<PathBuf>,
|
append_vec_files: Vec<PathBuf>,
|
||||||
snapshot_from: SnapshotFrom,
|
snapshot_from: SnapshotFrom,
|
||||||
) -> Result<AccountStorageMap, std::io::Error> {
|
) -> Result<AccountStorageMap, SnapshotError> {
|
||||||
let rebuilder = Arc::new(SnapshotStorageRebuilder::new(
|
let rebuilder = Arc::new(SnapshotStorageRebuilder::new(
|
||||||
file_receiver,
|
file_receiver,
|
||||||
num_threads,
|
num_threads,
|
||||||
|
@ -236,17 +235,17 @@ impl SnapshotStorageRebuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Processes buffered append_vec_files
|
/// Processes buffered append_vec_files
|
||||||
fn process_buffered_files(&self, append_vec_files: Vec<PathBuf>) -> Result<(), std::io::Error> {
|
fn process_buffered_files(&self, append_vec_files: Vec<PathBuf>) -> Result<(), SnapshotError> {
|
||||||
append_vec_files
|
append_vec_files
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|path| self.process_append_vec_file(path))
|
.map(|path| self.process_append_vec_file(path))
|
||||||
.collect::<Result<(), std::io::Error>>()
|
.collect::<Result<(), SnapshotError>>()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn a single thread to process received append_vec_files
|
/// Spawn a single thread to process received append_vec_files
|
||||||
fn spawn_receiver_thread(
|
fn spawn_receiver_thread(
|
||||||
thread_pool: &ThreadPool,
|
thread_pool: &ThreadPool,
|
||||||
exit_sender: Sender<Result<(), std::io::Error>>,
|
exit_sender: Sender<Result<(), SnapshotError>>,
|
||||||
rebuilder: Arc<SnapshotStorageRebuilder>,
|
rebuilder: Arc<SnapshotStorageRebuilder>,
|
||||||
) {
|
) {
|
||||||
thread_pool.spawn(move || {
|
thread_pool.spawn(move || {
|
||||||
|
@ -269,7 +268,7 @@ impl SnapshotStorageRebuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process an append_vec_file
|
/// Process an append_vec_file
|
||||||
fn process_append_vec_file(&self, path: PathBuf) -> Result<(), std::io::Error> {
|
fn process_append_vec_file(&self, path: PathBuf) -> Result<(), SnapshotError> {
|
||||||
let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||||
if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) {
|
if let Some(SnapshotFileKind::Storage) = get_snapshot_file_kind(&filename) {
|
||||||
let (slot, append_vec_id) = get_slot_and_append_vec_id(&filename);
|
let (slot, append_vec_id) = get_slot_and_append_vec_id(&filename);
|
||||||
|
@ -300,7 +299,7 @@ impl SnapshotStorageRebuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a slot that has received all storage entries
|
/// Process a slot that has received all storage entries
|
||||||
fn process_complete_slot(&self, slot: Slot) -> Result<(), std::io::Error> {
|
fn process_complete_slot(&self, slot: Slot) -> Result<(), SnapshotError> {
|
||||||
let slot_storage_paths = self.storage_paths.get(&slot).unwrap();
|
let slot_storage_paths = self.storage_paths.get(&slot).unwrap();
|
||||||
let lock = slot_storage_paths.lock().unwrap();
|
let lock = slot_storage_paths.lock().unwrap();
|
||||||
|
|
||||||
|
@ -335,7 +334,7 @@ impl SnapshotStorageRebuilder {
|
||||||
|
|
||||||
Ok((storage_entry.append_vec_id(), storage_entry))
|
Ok((storage_entry.append_vec_id(), storage_entry))
|
||||||
})
|
})
|
||||||
.collect::<Result<HashMap<AppendVecId, Arc<AccountStorageEntry>>, std::io::Error>>()?;
|
.collect::<Result<HashMap<AppendVecId, Arc<AccountStorageEntry>>, SnapshotError>>()?;
|
||||||
|
|
||||||
let storage = if slot_stores.len() > 1 {
|
let storage = if slot_stores.len() > 1 {
|
||||||
let remapped_append_vec_folder = lock.first().unwrap().parent().unwrap();
|
let remapped_append_vec_folder = lock.first().unwrap().parent().unwrap();
|
||||||
|
@ -388,8 +387,8 @@ impl SnapshotStorageRebuilder {
|
||||||
/// Wait for the completion of the rebuilding threads
|
/// Wait for the completion of the rebuilding threads
|
||||||
fn wait_for_completion(
|
fn wait_for_completion(
|
||||||
&self,
|
&self,
|
||||||
exit_receiver: Receiver<Result<(), std::io::Error>>,
|
exit_receiver: Receiver<Result<(), SnapshotError>>,
|
||||||
) -> Result<(), std::io::Error> {
|
) -> Result<(), SnapshotError> {
|
||||||
let num_slots = self.snapshot_storage_lengths.len();
|
let num_slots = self.snapshot_storage_lengths.len();
|
||||||
let mut last_log_time = Instant::now();
|
let mut last_log_time = Instant::now();
|
||||||
loop {
|
loop {
|
||||||
|
|
Loading…
Reference in New Issue