From 02058ea699813e142f141e7c0844eede8a8fc7c7 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Fri, 8 Nov 2019 20:21:54 -0500 Subject: [PATCH] Reject blocks with invalid last ticks in replay stage (#6833) * Reject blocks with invalid last ticks in replay stage * slot_full --- core/src/broadcast_stage.rs | 4 +- core/src/replay_stage.rs | 112 ++++++++++++++++++++++++++---------- ledger/src/block_error.rs | 3 + ledger/src/blocktree.rs | 21 ++++--- 4 files changed, 99 insertions(+), 41 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index dc4170354..b2888c702 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -298,8 +298,8 @@ mod test { ); let blocktree = broadcast_service.blocktree; - let (entries, _) = blocktree - .get_slot_entries_with_shred_count(slot, 0) + let (entries, _, _) = blocktree + .get_slot_entries_with_shred_info(slot, 0) .expect("Expect entries to be present"); assert_eq!(entries.len(), max_tick_height as usize); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 1dda41458..c91a9b257 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -427,7 +427,7 @@ impl ReplayStage { .or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash())); let now = Instant::now(); let load_result = - Self::load_blocktree_entries_with_shred_count(bank, blocktree, bank_progress); + Self::load_blocktree_entries_with_shred_info(bank, blocktree, bank_progress); let fetch_entries_elapsed = now.elapsed().as_micros(); if load_result.is_err() { bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64; @@ -435,15 +435,16 @@ impl ReplayStage { bank_progress.stats.fetch_entries_elapsed += fetch_entries_elapsed as u64; } - let replay_result = load_result.and_then(|(entries, num_shreds)| { + let replay_result = load_result.and_then(|(entries, num_shreds, slot_full)| { trace!( - "Fetch entries for slot {}, {:?} entries, num shreds {:?}", + "Fetch entries for slot {}, {:?} entries, num shreds {}, slot_full: {}", bank.slot(), entries.len(), - num_shreds + num_shreds, + slot_full, ); tx_count += entries.iter().map(|e| e.transactions.len()).sum::(); - Self::replay_entries_into_bank(bank, entries, bank_progress, num_shreds) + Self::replay_entries_into_bank(bank, bank_progress, entries, num_shreds, slot_full) }); if Self::is_replay_result_fatal(&replay_result) { @@ -743,30 +744,31 @@ impl ReplayStage { }); } - fn load_blocktree_entries_with_shred_count( + fn load_blocktree_entries_with_shred_info( bank: &Bank, blocktree: &Blocktree, bank_progress: &mut ForkProgress, - ) -> Result<(Vec, usize)> { - let bank_slot = bank.slot(); - let entries_and_shred_count = blocktree - .get_slot_entries_with_shred_count(bank_slot, bank_progress.num_shreds as u64)?; - Ok(entries_and_shred_count) + ) -> Result<(Vec, usize, bool)> { + blocktree + .get_slot_entries_with_shred_info(bank.slot(), bank_progress.num_shreds as u64) + .map_err(|err| err.into()) } fn replay_entries_into_bank( bank: &Arc, - entries: Vec, bank_progress: &mut ForkProgress, - num: usize, + entries: Vec, + num_shreds: usize, + slot_full: bool, ) -> Result<()> { let result = Self::verify_and_process_entries( &bank, &entries, + slot_full, bank_progress.num_shreds, bank_progress, ); - bank_progress.num_shreds += num; + bank_progress.num_shreds += num_shreds; bank_progress.num_entries += entries.len(); if let Some(last_entry) = entries.last() { bank_progress.last_entry = last_entry.hash; @@ -778,26 +780,33 @@ impl ReplayStage { fn verify_ticks( bank: &Arc, entries: &[Entry], + slot_full: bool, tick_hash_count: &mut u64, ) -> std::result::Result<(), BlockError> { - if entries.is_empty() { - return Ok(()); - } - - let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0); - if !entries.verify_tick_hash_count(tick_hash_count, hashes_per_tick) { - return Err(BlockError::InvalidTickHashCount); - } - let next_bank_tick_height = bank.tick_height() + entries.tick_count(); let max_bank_tick_height = bank.max_tick_height(); if next_bank_tick_height > max_bank_tick_height { return Err(BlockError::InvalidTickCount); } - let has_trailing_entry = !entries.last().unwrap().is_tick(); - if next_bank_tick_height == max_bank_tick_height && has_trailing_entry { - return Err(BlockError::TrailingEntry); + if next_bank_tick_height < max_bank_tick_height && slot_full { + return Err(BlockError::InvalidTickCount); + } + + if next_bank_tick_height == max_bank_tick_height { + let has_trailing_entry = !entries.last().unwrap().is_tick(); + if has_trailing_entry { + return Err(BlockError::TrailingEntry); + } + + if !slot_full { + return Err(BlockError::InvalidLastTick); + } + } + + let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0); + if !entries.verify_tick_hash_count(tick_hash_count, hashes_per_tick) { + return Err(BlockError::InvalidTickHashCount); } Ok(()) @@ -806,6 +815,7 @@ impl ReplayStage { fn verify_and_process_entries( bank: &Arc, entries: &[Entry], + slot_full: bool, shred_index: usize, bank_progress: &mut ForkProgress, ) -> Result<()> { @@ -813,14 +823,15 @@ impl ReplayStage { let tick_hash_count = &mut bank_progress.tick_hash_count; let handle_block_error = move |block_error: BlockError| -> Result<()> { warn!( - "{:#?}, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}", + "{:#?}, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}, slot_full: {}", block_error, bank.slot(), entries.len(), bank.tick_height(), last_entry, bank.last_blockhash(), - shred_index + shred_index, + slot_full, ); datapoint_error!( @@ -832,7 +843,7 @@ impl ReplayStage { Err(Error::BlockError(block_error)) }; - if let Err(block_error) = Self::verify_ticks(bank, entries, tick_hash_count) { + if let Err(block_error) = Self::verify_ticks(bank, entries, slot_full, tick_hash_count) { return handle_block_error(block_error); } @@ -1088,6 +1099,7 @@ mod test { #[test] fn test_dead_fork_invalid_slot_tick_count() { + // Too many ticks per slot let res = check_dead_fork(|_keypair, bank| { let blockhash = bank.last_blockhash(); let slot = bank.slot(); @@ -1105,6 +1117,46 @@ mod test { } else { assert!(false); } + + // Too few ticks per slot + let res = check_dead_fork(|_keypair, bank| { + let blockhash = bank.last_blockhash(); + let slot = bank.slot(); + let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0); + entries_to_test_shreds( + entry::create_ticks(bank.ticks_per_slot() - 1, hashes_per_tick, blockhash), + slot, + slot.saturating_sub(1), + true, + ) + }); + + if let Err(Error::BlockError(block_error)) = res { + assert_eq!(block_error, BlockError::InvalidTickCount); + } else { + assert!(false); + } + } + + #[test] + fn test_dead_fork_invalid_last_tick() { + let res = check_dead_fork(|_keypair, bank| { + let blockhash = bank.last_blockhash(); + let slot = bank.slot(); + let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0); + entries_to_test_shreds( + entry::create_ticks(bank.ticks_per_slot(), hashes_per_tick, blockhash), + slot, + slot.saturating_sub(1), + false, + ) + }); + + if let Err(Error::BlockError(block_error)) = res { + assert_eq!(block_error, BlockError::InvalidLastTick); + } else { + assert!(false); + } } #[test] @@ -1121,7 +1173,7 @@ mod test { system_transaction::transfer(&genesis_keypair, &keypair.pubkey(), 2, blockhash); let trailing_entry = entry::next_entry(&last_entry_hash, 1, vec![tx]); entries.push(trailing_entry); - entries_to_test_shreds(entries, slot, slot.saturating_sub(1), false) + entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true) }); if let Err(Error::BlockError(block_error)) = res { diff --git a/ledger/src/block_error.rs b/ledger/src/block_error.rs index eca7342a7..a60aafabb 100644 --- a/ledger/src/block_error.rs +++ b/ledger/src/block_error.rs @@ -3,6 +3,9 @@ pub enum BlockError { /// Block entries hashes must all be valid InvalidEntryHash, + /// Blocks must end in a tick that has been marked as the last tick. + InvalidLastTick, + /// Blocks can not have extra ticks or missing ticks InvalidTickCount, diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index e1bfc59e2..b16bbf460 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -1073,19 +1073,21 @@ impl Blocktree { shred_start_index: u64, _max_entries: Option, ) -> Result> { - self.get_slot_entries_with_shred_count(slot, shred_start_index) + self.get_slot_entries_with_shred_info(slot, shred_start_index) .map(|x| x.0) } - pub fn get_slot_entries_with_shred_count( + /// Returns the entry vector for the slot starting with `shred_start_index`, the number of + /// shreds that comprise the entry vector, and whether the slot is full (consumed all shreds). + pub fn get_slot_entries_with_shred_info( &self, slot: Slot, start_index: u64, - ) -> Result<(Vec, usize)> { + ) -> Result<(Vec, usize, bool)> { let slot_meta_cf = self.db.column::(); let slot_meta = slot_meta_cf.get(slot)?; if slot_meta.is_none() { - return Ok((vec![], 0)); + return Ok((vec![], 0, false)); } let slot_meta = slot_meta.unwrap(); @@ -1096,13 +1098,14 @@ impl Blocktree { slot_meta.consumed as u32, ); if completed_ranges.is_empty() { - return Ok((vec![], 0)); + return Ok((vec![], 0, false)); } let num_shreds = completed_ranges .last() - .map(|(_, end_index)| u64::from(*end_index) - start_index + 1); + .map(|(_, end_index)| u64::from(*end_index) - start_index + 1) + .unwrap_or(0) as usize; - let all_entries: Result>> = PAR_THREAD_POOL.with(|thread_pool| { + let entries: Result>> = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { completed_ranges .par_iter() @@ -1113,8 +1116,8 @@ impl Blocktree { }) }); - let all_entries: Vec = all_entries?.into_iter().flatten().collect(); - Ok((all_entries, num_shreds.unwrap_or(0) as usize)) + let entries: Vec = entries?.into_iter().flatten().collect(); + Ok((entries, num_shreds, slot_meta.is_full())) } // Get the range of indexes [start_index, end_index] of every completed data block