Cleanup error type mapping in serde_snapshot (#10580)
* Cleanup error type mapping in serde_snapshot * Fail when account storage entry files cannot be moved from snapshot
This commit is contained in:
parent
540ac9eb6b
commit
50c93d4441
|
@ -88,7 +88,7 @@ pub enum SnapshotError {
|
|||
IO(#[from] std::io::Error),
|
||||
|
||||
#[error("serialization error")]
|
||||
Serialize(#[from] Box<bincode::ErrorKind>),
|
||||
Serialize(#[from] bincode::Error),
|
||||
|
||||
#[error("file system error")]
|
||||
FsExtra(#[from] fs_extra::error::Error),
|
||||
|
|
|
@ -7,7 +7,7 @@ use {
|
|||
append_vec::AppendVec,
|
||||
bank::BankRc,
|
||||
},
|
||||
bincode::{deserialize_from, serialize_into},
|
||||
bincode::{deserialize_from, serialize_into, Error},
|
||||
fs_extra::dir::CopyOptions,
|
||||
log::{info, warn},
|
||||
rand::{thread_rng, Rng},
|
||||
|
@ -20,9 +20,7 @@ use {
|
|||
cmp::min,
|
||||
collections::HashMap,
|
||||
fmt::{Formatter, Result as FormatResult},
|
||||
io::{
|
||||
BufReader, BufWriter, Cursor, Error as IoError, ErrorKind as IoErrorKind, Read, Write,
|
||||
},
|
||||
io::{BufReader, BufWriter, Cursor, Read, Write},
|
||||
path::{Path, PathBuf},
|
||||
result::Result,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
|
@ -79,32 +77,20 @@ pub trait TypeContext<'a> {
|
|||
|
||||
fn deserialize_accounts_db_fields<R>(
|
||||
stream: &mut BufReader<R>,
|
||||
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, IoError>
|
||||
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, Error>
|
||||
where
|
||||
R: Read;
|
||||
|
||||
// we might define fn (de)serialize_bank(...) -> Result<Bank,...> for versionized bank serialization in the future
|
||||
}
|
||||
|
||||
fn bankrc_to_io_error<T: ToString>(error: T) -> IoError {
|
||||
let msg = error.to_string();
|
||||
warn!("BankRc error: {:?}", msg);
|
||||
IoError::new(IoErrorKind::Other, msg)
|
||||
}
|
||||
|
||||
fn accountsdb_to_io_error<T: ToString>(error: T) -> IoError {
|
||||
let msg = error.to_string();
|
||||
warn!("AccountsDB error: {:?}", msg);
|
||||
IoError::new(IoErrorKind::Other, msg)
|
||||
}
|
||||
|
||||
pub fn bankrc_from_stream<R, P>(
|
||||
serde_style: SerdeStyle,
|
||||
account_paths: &[PathBuf],
|
||||
slot: Slot,
|
||||
stream: &mut BufReader<R>,
|
||||
stream_append_vecs_path: P,
|
||||
) -> std::result::Result<BankRc, IoError>
|
||||
) -> std::result::Result<BankRc, Error>
|
||||
where
|
||||
R: Read,
|
||||
P: AsRef<Path>,
|
||||
|
@ -125,6 +111,10 @@ where
|
|||
SerdeStyle::NEWER => INTO!(TypeContextFuture),
|
||||
SerdeStyle::OLDER => INTO!(TypeContextLegacy),
|
||||
}
|
||||
.map_err(|err| {
|
||||
warn!("bankrc_from_stream error: {:?}", err);
|
||||
err
|
||||
})
|
||||
}
|
||||
|
||||
pub fn bankrc_to_stream<W>(
|
||||
|
@ -132,7 +122,7 @@ pub fn bankrc_to_stream<W>(
|
|||
stream: &mut BufWriter<W>,
|
||||
bank_rc: &BankRc,
|
||||
snapshot_storages: &[SnapshotStorage],
|
||||
) -> Result<(), IoError>
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
W: Write,
|
||||
{
|
||||
|
@ -146,13 +136,16 @@ where
|
|||
phantom: std::marker::PhantomData::default(),
|
||||
},
|
||||
)
|
||||
.map_err(bankrc_to_io_error)
|
||||
};
|
||||
}
|
||||
match serde_style {
|
||||
SerdeStyle::NEWER => INTO!(TypeContextFuture),
|
||||
SerdeStyle::OLDER => INTO!(TypeContextLegacy),
|
||||
}
|
||||
.map_err(|err| {
|
||||
warn!("bankrc_to_stream error: {:?}", err);
|
||||
err
|
||||
})
|
||||
}
|
||||
|
||||
pub struct SerializableBankRc<'a, C> {
|
||||
|
@ -190,7 +183,7 @@ fn context_accountsdb_from_fields<'a, C, P>(
|
|||
account_db_fields: AccountDBFields<C::SerializableAccountStorageEntry>,
|
||||
account_paths: &[PathBuf],
|
||||
stream_append_vecs_path: P,
|
||||
) -> Result<AccountsDB, IoError>
|
||||
) -> Result<AccountsDB, Error>
|
||||
where
|
||||
C: TypeContext<'a>,
|
||||
P: AsRef<Path>,
|
||||
|
@ -241,38 +234,23 @@ where
|
|||
.as_ref()
|
||||
.join(&append_vec_relative_path);
|
||||
let target = local_dir.join(append_vec_abs_path.file_name().unwrap());
|
||||
if std::fs::rename(append_vec_abs_path.clone(), target).is_err() {
|
||||
std::fs::rename(append_vec_abs_path.clone(), target).or_else(|_| {
|
||||
let mut copy_options = CopyOptions::new();
|
||||
copy_options.overwrite = true;
|
||||
let e = fs_extra::move_items(
|
||||
&vec![&append_vec_abs_path],
|
||||
&local_dir,
|
||||
©_options,
|
||||
)
|
||||
.map_err(|e| {
|
||||
format!(
|
||||
"unable to move {:?} to {:?}: {}",
|
||||
append_vec_abs_path, local_dir, e
|
||||
)
|
||||
})
|
||||
.map_err(accountsdb_to_io_error);
|
||||
if e.is_err() {
|
||||
info!("{:?}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
fs_extra::move_items(&vec![&append_vec_abs_path], &local_dir, ©_options)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
|
||||
.and(Ok(()))
|
||||
})?;
|
||||
|
||||
// Notify the AppendVec of the new file location
|
||||
let local_path = local_dir.join(append_vec_relative_path);
|
||||
let mut u_storage_entry = Arc::try_unwrap(storage_entry).unwrap();
|
||||
u_storage_entry
|
||||
.set_file(local_path)
|
||||
.map_err(accountsdb_to_io_error)?;
|
||||
u_storage_entry.set_file(local_path)?;
|
||||
new_slot_storage.insert(id, Arc::new(u_storage_entry));
|
||||
}
|
||||
Ok((slot, new_slot_storage))
|
||||
})
|
||||
.collect::<Result<HashMap<Slot, _>, IoError>>()?;
|
||||
.collect::<Result<HashMap<Slot, _>, Error>>()?;
|
||||
|
||||
// discard any slots with no storage entries
|
||||
// this can happen if a non-root slot was serialized
|
||||
|
|
|
@ -92,10 +92,10 @@ impl<'a> TypeContext<'a> for Context {
|
|||
|
||||
fn deserialize_accounts_db_fields<R>(
|
||||
mut stream: &mut BufReader<R>,
|
||||
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, IoError>
|
||||
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, Error>
|
||||
where
|
||||
R: Read,
|
||||
{
|
||||
deserialize_from(&mut stream).map_err(accountsdb_to_io_error)
|
||||
deserialize_from(&mut stream)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -173,36 +173,25 @@ impl<'a> TypeContext<'a> for Context {
|
|||
|
||||
fn deserialize_accounts_db_fields<R>(
|
||||
mut stream: &mut BufReader<R>,
|
||||
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, IoError>
|
||||
) -> Result<AccountDBFields<Self::SerializableAccountStorageEntry>, Error>
|
||||
where
|
||||
R: Read,
|
||||
{
|
||||
// read and discard two u64 byte vector lengths
|
||||
let serialized_len = MAX_ACCOUNTS_DB_STREAM_SIZE;
|
||||
let serialized_len = min(
|
||||
serialized_len,
|
||||
deserialize_from(&mut stream).map_err(accountsdb_to_io_error)?,
|
||||
);
|
||||
let serialized_len = min(
|
||||
serialized_len,
|
||||
deserialize_from(&mut stream).map_err(accountsdb_to_io_error)?,
|
||||
);
|
||||
let serialized_len = min(serialized_len, deserialize_from(&mut stream)?);
|
||||
let serialized_len = min(serialized_len, deserialize_from(&mut stream)?);
|
||||
|
||||
// (1st of 3 elements) read in map of slots to account storage entries
|
||||
let storage: HashMap<Slot, Vec<Self::SerializableAccountStorageEntry>> = bincode::config()
|
||||
.limit(serialized_len)
|
||||
.deserialize_from(&mut stream)
|
||||
.map_err(accountsdb_to_io_error)?;
|
||||
.deserialize_from(&mut stream)?;
|
||||
|
||||
// (2nd of 3 elements) read in write version
|
||||
let version: u64 = deserialize_from(&mut stream)
|
||||
.map_err(|e| format!("write version deserialize error: {}", e.to_string()))
|
||||
.map_err(accountsdb_to_io_error)?;
|
||||
let version: u64 = deserialize_from(&mut stream)?;
|
||||
|
||||
// (3rd of 3 elements) read in (slot, bank hashes) pair
|
||||
let (slot, bank_hash_info): (Slot, BankHashInfo) = deserialize_from(&mut stream)
|
||||
.map_err(|e| format!("bank hashes deserialize error: {}", e.to_string()))
|
||||
.map_err(accountsdb_to_io_error)?;
|
||||
let (slot, bank_hash_info): (Slot, BankHashInfo) = deserialize_from(&mut stream)?;
|
||||
|
||||
Ok(AccountDBFields(storage, version, slot, bank_hash_info))
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ fn context_accountsdb_from_stream<'a, C, R, P>(
|
|||
stream: &mut BufReader<R>,
|
||||
account_paths: &[PathBuf],
|
||||
stream_append_vecs_path: P,
|
||||
) -> Result<AccountsDB, IoError>
|
||||
) -> Result<AccountsDB, Error>
|
||||
where
|
||||
C: TypeContext<'a>,
|
||||
R: Read,
|
||||
|
@ -77,7 +77,7 @@ fn accountsdb_from_stream<R, P>(
|
|||
stream: &mut BufReader<R>,
|
||||
account_paths: &[PathBuf],
|
||||
stream_append_vecs_path: P,
|
||||
) -> Result<AccountsDB, IoError>
|
||||
) -> Result<AccountsDB, Error>
|
||||
where
|
||||
R: Read,
|
||||
P: AsRef<Path>,
|
||||
|
@ -103,7 +103,7 @@ fn accountsdb_to_stream<W>(
|
|||
accounts_db: &AccountsDB,
|
||||
slot: Slot,
|
||||
account_storage_entries: &[SnapshotStorage],
|
||||
) -> Result<(), IoError>
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
W: Write,
|
||||
{
|
||||
|
@ -116,8 +116,7 @@ where
|
|||
account_storage_entries,
|
||||
phantom: std::marker::PhantomData::default(),
|
||||
},
|
||||
)
|
||||
.map_err(bankrc_to_io_error),
|
||||
),
|
||||
SerdeStyle::OLDER => serialize_into(
|
||||
stream,
|
||||
&SerializableAccountsDB::<TypeContextLegacy> {
|
||||
|
@ -126,8 +125,7 @@ where
|
|||
account_storage_entries,
|
||||
phantom: std::marker::PhantomData::default(),
|
||||
},
|
||||
)
|
||||
.map_err(bankrc_to_io_error),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue