Ensure blobs are deserializable without unwrapping (#4948)
* Return result from deserializing blobs in blocktree instead of assuming deserialization will succeed * Mark bad deserialization as dead fork * Add test for corrupted blobs in blocktree and replay_stage
This commit is contained in:
parent
fc180f4cbf
commit
2aac094f63
|
@ -72,7 +72,7 @@ pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
|
|||
#[derive(Debug)]
|
||||
pub enum BlocktreeError {
|
||||
BlobForIndexExists,
|
||||
InvalidBlobData,
|
||||
InvalidBlobData(Box<bincode::ErrorKind>),
|
||||
RocksDb(rocksdb::Error),
|
||||
#[cfg(feature = "kvstore")]
|
||||
KvsDb(kvstore::Error),
|
||||
|
@ -801,7 +801,9 @@ impl Blocktree {
|
|||
max_entries,
|
||||
)?;
|
||||
let num = consecutive_blobs.len();
|
||||
Ok((deserialize_blobs(&consecutive_blobs), num))
|
||||
let blobs =
|
||||
deserialize_blobs(&consecutive_blobs).map_err(BlocktreeError::InvalidBlobData)?;
|
||||
Ok((blobs, num))
|
||||
}
|
||||
|
||||
// Returns slots connecting to any element of the list `slots`.
|
||||
|
@ -831,7 +833,7 @@ impl Blocktree {
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn deserialize_blob_data(data: &[u8]) -> Result<Vec<Entry>> {
|
||||
pub fn deserialize_blob_data(data: &[u8]) -> bincode::Result<Vec<Entry>> {
|
||||
let entries = deserialize(data)?;
|
||||
Ok(entries)
|
||||
}
|
||||
|
@ -1525,18 +1527,27 @@ fn recover(
|
|||
Ok((recovered_data, recovered_coding))
|
||||
}
|
||||
|
||||
fn deserialize_blobs<I>(blob_datas: &[I]) -> Vec<Entry>
|
||||
fn deserialize_blobs<I>(blob_datas: &[I]) -> bincode::Result<Vec<Entry>>
|
||||
where
|
||||
I: Borrow<[u8]>,
|
||||
{
|
||||
blob_datas
|
||||
.iter()
|
||||
.flat_map(|blob_data| {
|
||||
let blob_results = blob_datas.iter().map(|blob_data| {
|
||||
let serialized_entries_data = &blob_data.borrow()[BLOB_HEADER_SIZE..];
|
||||
Blocktree::deserialize_blob_data(serialized_entries_data)
|
||||
.expect("Ledger should only contain well formed data")
|
||||
})
|
||||
.collect()
|
||||
});
|
||||
|
||||
let mut entries = vec![];
|
||||
|
||||
// We want to early exit in this loop to prevent needless work if any blob is corrupted.
|
||||
// However, there's no way to early exit from a flat_map, so we're flattening manually
|
||||
// instead of calling map().flatten() to avoid allocating a vector for the map results above,
|
||||
// and then allocating another vector for flattening the results
|
||||
for r in blob_results {
|
||||
let blob_entries = r?;
|
||||
entries.extend(blob_entries);
|
||||
}
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option<SlotMeta>) -> bool {
|
||||
|
@ -3445,6 +3456,23 @@ pub mod tests {
|
|||
assert!(recovered_blobs_opt.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_corrupted_blob() {
|
||||
let path = get_tmp_ledger_path!();
|
||||
let blocktree = Blocktree::open(&path).unwrap();
|
||||
let (mut blobs, _) = make_slot_entries(0, 0, 1);
|
||||
{
|
||||
let blob0 = &mut blobs[0];
|
||||
// corrupt the size
|
||||
blob0.set_size(BLOB_HEADER_SIZE);
|
||||
}
|
||||
blocktree.insert_data_blobs(&blobs).unwrap();
|
||||
assert_matches!(
|
||||
blocktree.get_slot_entries(0, 0, None),
|
||||
Err(Error::BlocktreeError(BlocktreeError::InvalidBlobData(_)))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recovery_multi_slot_multi_thread() {
|
||||
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! The `replay_stage` replays transactions broadcast by the leader.
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::blocktree::Blocktree;
|
||||
use crate::blocktree::{Blocktree, BlocktreeError};
|
||||
use crate::blocktree_processor;
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::consensus::{StakeLockout, Tower};
|
||||
|
@ -283,6 +283,7 @@ impl ReplayStage {
|
|||
!Bank::can_commit(&tx_error)
|
||||
}
|
||||
Err(Error::BlobError(BlobError::VerificationFailed)) => true,
|
||||
Err(Error::BlocktreeError(BlocktreeError::InvalidBlobData(_))) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
@ -292,10 +293,17 @@ impl ReplayStage {
|
|||
blocktree: &Blocktree,
|
||||
progress: &mut HashMap<u64, ForkProgress>,
|
||||
) -> Result<()> {
|
||||
let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?;
|
||||
let result = Self::replay_entries_into_bank(bank, entries, progress, num);
|
||||
let result =
|
||||
Self::load_blocktree_entries(bank, blocktree, progress).and_then(|(entries, num)| {
|
||||
Self::replay_entries_into_bank(bank, entries, progress, num)
|
||||
});
|
||||
|
||||
if Self::is_replay_result_fatal(&result) {
|
||||
warn!(
|
||||
"Fatal replay result in slot: {}, result: {:?}",
|
||||
bank.slot(),
|
||||
result
|
||||
);
|
||||
Self::mark_dead_slot(bank.slot(), blocktree, progress);
|
||||
}
|
||||
|
||||
|
@ -663,11 +671,10 @@ mod test {
|
|||
use crate::blocktree::get_tmp_ledger_path;
|
||||
use crate::entry;
|
||||
use crate::genesis_utils::create_genesis_block;
|
||||
use crate::packet::Blob;
|
||||
use crate::packet::{Blob, BLOB_HEADER_SIZE};
|
||||
use crate::replay_stage::ReplayStage;
|
||||
use solana_runtime::genesis_utils::GenesisBlockInfo;
|
||||
use solana_sdk::hash::hash;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::hash::{hash, Hash};
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::system_transaction;
|
||||
use solana_sdk::transaction::TransactionError;
|
||||
|
@ -731,53 +738,110 @@ mod test {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_dead_forks() {
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blocktree = Arc::new(
|
||||
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
|
||||
);
|
||||
let GenesisBlockInfo {
|
||||
genesis_block,
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank0 = Arc::new(Bank::new(&genesis_block));
|
||||
let mut progress = HashMap::new();
|
||||
progress.insert(bank0.slot(), ForkProgress::new(bank0.last_blockhash()));
|
||||
|
||||
fn test_dead_fork_transaction_error() {
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let missing_keypair = Keypair::new();
|
||||
let missing_keypair2 = Keypair::new();
|
||||
|
||||
// Insert entry with TransactionError::AccountNotFound error
|
||||
let account_not_found_blob = entry::next_entry(
|
||||
&bank0.last_blockhash(),
|
||||
let res = check_dead_fork(|blockhash| {
|
||||
entry::next_entry(
|
||||
blockhash,
|
||||
1,
|
||||
vec![
|
||||
system_transaction::create_user_account(
|
||||
&keypair1,
|
||||
&keypair2.pubkey(),
|
||||
2,
|
||||
bank0.last_blockhash(),
|
||||
*blockhash,
|
||||
), // should be fine,
|
||||
system_transaction::transfer(
|
||||
&missing_keypair,
|
||||
&mint_keypair.pubkey(),
|
||||
&missing_keypair2.pubkey(),
|
||||
2,
|
||||
bank0.last_blockhash(),
|
||||
*blockhash,
|
||||
), // should cause AccountNotFound error
|
||||
],
|
||||
)
|
||||
.to_blob();
|
||||
.to_blob()
|
||||
});
|
||||
|
||||
blocktree
|
||||
.insert_data_blobs(&[account_not_found_blob])
|
||||
.unwrap();
|
||||
assert_matches!(
|
||||
ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress),
|
||||
res,
|
||||
Err(Error::TransactionError(TransactionError::AccountNotFound))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dead_fork_entry_verification_failure() {
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let res = check_dead_fork(|blockhash| {
|
||||
let bad_hash = hash(&[2; 30]);
|
||||
entry::next_entry(
|
||||
// User wrong blockhash so that the the entry causes an entry verification failure
|
||||
&bad_hash,
|
||||
1,
|
||||
vec![system_transaction::create_user_account(
|
||||
&keypair1,
|
||||
&keypair2.pubkey(),
|
||||
2,
|
||||
*blockhash,
|
||||
)],
|
||||
)
|
||||
.to_blob()
|
||||
});
|
||||
|
||||
assert_matches!(res, Err(Error::BlobError(BlobError::VerificationFailed)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dead_fork_blob_deserialize_failure() {
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
// Insert entry that causes blob deserialization failure
|
||||
|
||||
let res = check_dead_fork(|blockhash| {
|
||||
let mut b = entry::next_entry(
|
||||
&blockhash,
|
||||
1,
|
||||
vec![system_transaction::create_user_account(
|
||||
&keypair1,
|
||||
&keypair2.pubkey(),
|
||||
2,
|
||||
*blockhash,
|
||||
)],
|
||||
)
|
||||
.to_blob();
|
||||
b.set_size(BLOB_HEADER_SIZE);
|
||||
b
|
||||
});
|
||||
|
||||
assert_matches!(
|
||||
res,
|
||||
Err(Error::BlocktreeError(BlocktreeError::InvalidBlobData(_)))
|
||||
);
|
||||
}
|
||||
|
||||
// Given a blob and a fatal expected error, check that replaying that blob causes causes the fork to be
|
||||
// marked as dead. Returns the error for caller to verify.
|
||||
fn check_dead_fork<F>(blob_to_insert: F) -> Result<()>
|
||||
where
|
||||
F: Fn(&Hash) -> Blob,
|
||||
{
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let res = {
|
||||
let blocktree = Arc::new(
|
||||
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
|
||||
);
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
||||
let bank0 = Arc::new(Bank::new(&genesis_block));
|
||||
let mut progress = HashMap::new();
|
||||
let last_blockhash = bank0.last_blockhash();
|
||||
progress.insert(bank0.slot(), ForkProgress::new(last_blockhash));
|
||||
let blob = blob_to_insert(&last_blockhash);
|
||||
blocktree.insert_data_blobs(&[blob]).unwrap();
|
||||
let res = ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress);
|
||||
|
||||
// Check that the erroring bank was marked as dead in the progress map
|
||||
assert!(progress
|
||||
|
@ -787,46 +851,9 @@ mod test {
|
|||
|
||||
// Check that the erroring bank was marked as dead in blocktree
|
||||
assert!(blocktree.is_dead(bank0.slot()));
|
||||
|
||||
// Create new bank
|
||||
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), bank0.slot() + 1);
|
||||
progress.insert(bank1.slot(), ForkProgress::new(bank0.last_blockhash()));
|
||||
let bad_hash = hash(&[2; 30]);
|
||||
|
||||
// Insert entry that causes verification failure
|
||||
let mut verifcation_failure_blob = entry::next_entry(
|
||||
// use wrong blockhash
|
||||
&bad_hash,
|
||||
1,
|
||||
vec![system_transaction::create_user_account(
|
||||
&keypair1,
|
||||
&keypair2.pubkey(),
|
||||
2,
|
||||
bank1.last_blockhash(),
|
||||
)],
|
||||
)
|
||||
.to_blob();
|
||||
verifcation_failure_blob.set_slot(1);
|
||||
verifcation_failure_blob.set_index(0);
|
||||
verifcation_failure_blob.set_parent(bank0.slot());
|
||||
|
||||
blocktree
|
||||
.insert_data_blobs(&[verifcation_failure_blob])
|
||||
.unwrap();
|
||||
assert_matches!(
|
||||
ReplayStage::replay_blocktree_into_bank(&bank1, &blocktree, &mut progress),
|
||||
Err(Error::BlobError(BlobError::VerificationFailed))
|
||||
);
|
||||
// Check that the erroring bank was marked as dead in the progress map
|
||||
assert!(progress
|
||||
.get(&bank1.slot())
|
||||
.map(|b| b.is_dead)
|
||||
.unwrap_or(false));
|
||||
|
||||
// Check that the erroring bank was marked as dead in blocktree
|
||||
assert!(blocktree.is_dead(bank1.slot()));
|
||||
}
|
||||
|
||||
res
|
||||
};
|
||||
let _ignored = remove_dir_all(&ledger_path);
|
||||
res
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue