diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 2536aaca5f..660a206e4f 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -72,7 +72,7 @@ pub type CompletedSlotsReceiver = Receiver>; #[derive(Debug)] pub enum BlocktreeError { BlobForIndexExists, - InvalidBlobData, + InvalidBlobData(Box), RocksDb(rocksdb::Error), #[cfg(feature = "kvstore")] KvsDb(kvstore::Error), @@ -801,7 +801,9 @@ impl Blocktree { max_entries, )?; let num = consecutive_blobs.len(); - Ok((deserialize_blobs(&consecutive_blobs), num)) + let blobs = + deserialize_blobs(&consecutive_blobs).map_err(BlocktreeError::InvalidBlobData)?; + Ok((blobs, num)) } // Returns slots connecting to any element of the list `slots`. @@ -831,7 +833,7 @@ impl Blocktree { Ok(result) } - pub fn deserialize_blob_data(data: &[u8]) -> Result> { + pub fn deserialize_blob_data(data: &[u8]) -> bincode::Result> { let entries = deserialize(data)?; Ok(entries) } @@ -1525,18 +1527,27 @@ fn recover( Ok((recovered_data, recovered_coding)) } -fn deserialize_blobs(blob_datas: &[I]) -> Vec +fn deserialize_blobs(blob_datas: &[I]) -> bincode::Result> where I: Borrow<[u8]>, { - blob_datas - .iter() - .flat_map(|blob_data| { - let serialized_entries_data = &blob_data.borrow()[BLOB_HEADER_SIZE..]; - Blocktree::deserialize_blob_data(serialized_entries_data) - .expect("Ledger should only contain well formed data") - }) - .collect() + let blob_results = blob_datas.iter().map(|blob_data| { + let serialized_entries_data = &blob_data.borrow()[BLOB_HEADER_SIZE..]; + Blocktree::deserialize_blob_data(serialized_entries_data) + }); + + let mut entries = vec![]; + + // We want to early exit in this loop to prevent needless work if any blob is corrupted. + // However, there's no way to early exit from a flat_map, so we're flattening manually + // instead of calling map().flatten() to avoid allocating a vector for the map results above, + // and then allocating another vector for flattening the results + for r in blob_results { + let blob_entries = r?; + entries.extend(blob_entries); + } + + Ok(entries) } fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option) -> bool { @@ -3445,6 +3456,23 @@ pub mod tests { assert!(recovered_blobs_opt.is_none()); } + #[test] + fn test_deserialize_corrupted_blob() { + let path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&path).unwrap(); + let (mut blobs, _) = make_slot_entries(0, 0, 1); + { + let blob0 = &mut blobs[0]; + // corrupt the size + blob0.set_size(BLOB_HEADER_SIZE); + } + blocktree.insert_data_blobs(&blobs).unwrap(); + assert_matches!( + blocktree.get_slot_entries(0, 0, None), + Err(Error::BlocktreeError(BlocktreeError::InvalidBlobData(_))) + ); + } + #[test] fn test_recovery_multi_slot_multi_thread() { use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng}; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b82c62f4ec..4904c38f09 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1,7 +1,7 @@ //! The `replay_stage` replays transactions broadcast by the leader. use crate::bank_forks::BankForks; -use crate::blocktree::Blocktree; +use crate::blocktree::{Blocktree, BlocktreeError}; use crate::blocktree_processor; use crate::cluster_info::ClusterInfo; use crate::consensus::{StakeLockout, Tower}; @@ -283,6 +283,7 @@ impl ReplayStage { !Bank::can_commit(&tx_error) } Err(Error::BlobError(BlobError::VerificationFailed)) => true, + Err(Error::BlocktreeError(BlocktreeError::InvalidBlobData(_))) => true, _ => false, } } @@ -292,10 +293,17 @@ impl ReplayStage { blocktree: &Blocktree, progress: &mut HashMap, ) -> Result<()> { - let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?; - let result = Self::replay_entries_into_bank(bank, entries, progress, num); + let result = + Self::load_blocktree_entries(bank, blocktree, progress).and_then(|(entries, num)| { + Self::replay_entries_into_bank(bank, entries, progress, num) + }); if Self::is_replay_result_fatal(&result) { + warn!( + "Fatal replay result in slot: {}, result: {:?}", + bank.slot(), + result + ); Self::mark_dead_slot(bank.slot(), blocktree, progress); } @@ -663,11 +671,10 @@ mod test { use crate::blocktree::get_tmp_ledger_path; use crate::entry; use crate::genesis_utils::create_genesis_block; - use crate::packet::Blob; + use crate::packet::{Blob, BLOB_HEADER_SIZE}; use crate::replay_stage::ReplayStage; use solana_runtime::genesis_utils::GenesisBlockInfo; - use solana_sdk::hash::hash; - use solana_sdk::hash::Hash; + use solana_sdk::hash::{hash, Hash}; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; use solana_sdk::transaction::TransactionError; @@ -731,53 +738,110 @@ mod test { } #[test] - fn test_dead_forks() { - let ledger_path = get_tmp_ledger_path!(); - { - let blocktree = Arc::new( - Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), - ); - let GenesisBlockInfo { - genesis_block, - mint_keypair, - .. - } = create_genesis_block(1000); - let bank0 = Arc::new(Bank::new(&genesis_block)); - let mut progress = HashMap::new(); - progress.insert(bank0.slot(), ForkProgress::new(bank0.last_blockhash())); + fn test_dead_fork_transaction_error() { + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let missing_keypair = Keypair::new(); + let missing_keypair2 = Keypair::new(); - let keypair1 = Keypair::new(); - let keypair2 = Keypair::new(); - let missing_keypair = Keypair::new(); - - // Insert entry with TransactionError::AccountNotFound error - let account_not_found_blob = entry::next_entry( - &bank0.last_blockhash(), + let res = check_dead_fork(|blockhash| { + entry::next_entry( + blockhash, 1, vec![ system_transaction::create_user_account( &keypair1, &keypair2.pubkey(), 2, - bank0.last_blockhash(), + *blockhash, ), // should be fine, system_transaction::transfer( &missing_keypair, - &mint_keypair.pubkey(), + &missing_keypair2.pubkey(), 2, - bank0.last_blockhash(), + *blockhash, ), // should cause AccountNotFound error ], ) - .to_blob(); + .to_blob() + }); - blocktree - .insert_data_blobs(&[account_not_found_blob]) - .unwrap(); - assert_matches!( - ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress), - Err(Error::TransactionError(TransactionError::AccountNotFound)) + assert_matches!( + res, + Err(Error::TransactionError(TransactionError::AccountNotFound)) + ); + } + + #[test] + fn test_dead_fork_entry_verification_failure() { + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let res = check_dead_fork(|blockhash| { + let bad_hash = hash(&[2; 30]); + entry::next_entry( + // User wrong blockhash so that the the entry causes an entry verification failure + &bad_hash, + 1, + vec![system_transaction::create_user_account( + &keypair1, + &keypair2.pubkey(), + 2, + *blockhash, + )], + ) + .to_blob() + }); + + assert_matches!(res, Err(Error::BlobError(BlobError::VerificationFailed))); + } + + #[test] + fn test_dead_fork_blob_deserialize_failure() { + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + // Insert entry that causes blob deserialization failure + + let res = check_dead_fork(|blockhash| { + let mut b = entry::next_entry( + &blockhash, + 1, + vec![system_transaction::create_user_account( + &keypair1, + &keypair2.pubkey(), + 2, + *blockhash, + )], + ) + .to_blob(); + b.set_size(BLOB_HEADER_SIZE); + b + }); + + assert_matches!( + res, + Err(Error::BlocktreeError(BlocktreeError::InvalidBlobData(_))) + ); + } + + // Given a blob and a fatal expected error, check that replaying that blob causes causes the fork to be + // marked as dead. Returns the error for caller to verify. + fn check_dead_fork(blob_to_insert: F) -> Result<()> + where + F: Fn(&Hash) -> Blob, + { + let ledger_path = get_tmp_ledger_path!(); + let res = { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), ); + let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000); + let bank0 = Arc::new(Bank::new(&genesis_block)); + let mut progress = HashMap::new(); + let last_blockhash = bank0.last_blockhash(); + progress.insert(bank0.slot(), ForkProgress::new(last_blockhash)); + let blob = blob_to_insert(&last_blockhash); + blocktree.insert_data_blobs(&[blob]).unwrap(); + let res = ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress); // Check that the erroring bank was marked as dead in the progress map assert!(progress @@ -787,46 +851,9 @@ mod test { // Check that the erroring bank was marked as dead in blocktree assert!(blocktree.is_dead(bank0.slot())); - - // Create new bank - let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), bank0.slot() + 1); - progress.insert(bank1.slot(), ForkProgress::new(bank0.last_blockhash())); - let bad_hash = hash(&[2; 30]); - - // Insert entry that causes verification failure - let mut verifcation_failure_blob = entry::next_entry( - // use wrong blockhash - &bad_hash, - 1, - vec![system_transaction::create_user_account( - &keypair1, - &keypair2.pubkey(), - 2, - bank1.last_blockhash(), - )], - ) - .to_blob(); - verifcation_failure_blob.set_slot(1); - verifcation_failure_blob.set_index(0); - verifcation_failure_blob.set_parent(bank0.slot()); - - blocktree - .insert_data_blobs(&[verifcation_failure_blob]) - .unwrap(); - assert_matches!( - ReplayStage::replay_blocktree_into_bank(&bank1, &blocktree, &mut progress), - Err(Error::BlobError(BlobError::VerificationFailed)) - ); - // Check that the erroring bank was marked as dead in the progress map - assert!(progress - .get(&bank1.slot()) - .map(|b| b.is_dead) - .unwrap_or(false)); - - // Check that the erroring bank was marked as dead in blocktree - assert!(blocktree.is_dead(bank1.slot())); - } - + res + }; let _ignored = remove_dir_all(&ledger_path); + res } }