Reject blocks with invalid last ticks in replay stage (#6833)

* Reject blocks with invalid last ticks in replay stage

* slot_full
This commit is contained in:
Justin Starry 2019-11-08 20:21:54 -05:00 committed by GitHub
parent 91be35731c
commit 02058ea699
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 41 deletions

View File

@ -298,8 +298,8 @@ mod test {
); );
let blocktree = broadcast_service.blocktree; let blocktree = broadcast_service.blocktree;
let (entries, _) = blocktree let (entries, _, _) = blocktree
.get_slot_entries_with_shred_count(slot, 0) .get_slot_entries_with_shred_info(slot, 0)
.expect("Expect entries to be present"); .expect("Expect entries to be present");
assert_eq!(entries.len(), max_tick_height as usize); assert_eq!(entries.len(), max_tick_height as usize);

View File

@ -427,7 +427,7 @@ impl ReplayStage {
.or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash())); .or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash()));
let now = Instant::now(); let now = Instant::now();
let load_result = let load_result =
Self::load_blocktree_entries_with_shred_count(bank, blocktree, bank_progress); Self::load_blocktree_entries_with_shred_info(bank, blocktree, bank_progress);
let fetch_entries_elapsed = now.elapsed().as_micros(); let fetch_entries_elapsed = now.elapsed().as_micros();
if load_result.is_err() { if load_result.is_err() {
bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64; bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64;
@ -435,15 +435,16 @@ impl ReplayStage {
bank_progress.stats.fetch_entries_elapsed += fetch_entries_elapsed as u64; bank_progress.stats.fetch_entries_elapsed += fetch_entries_elapsed as u64;
} }
let replay_result = load_result.and_then(|(entries, num_shreds)| { let replay_result = load_result.and_then(|(entries, num_shreds, slot_full)| {
trace!( trace!(
"Fetch entries for slot {}, {:?} entries, num shreds {:?}", "Fetch entries for slot {}, {:?} entries, num shreds {}, slot_full: {}",
bank.slot(), bank.slot(),
entries.len(), entries.len(),
num_shreds num_shreds,
slot_full,
); );
tx_count += entries.iter().map(|e| e.transactions.len()).sum::<usize>(); tx_count += entries.iter().map(|e| e.transactions.len()).sum::<usize>();
Self::replay_entries_into_bank(bank, entries, bank_progress, num_shreds) Self::replay_entries_into_bank(bank, bank_progress, entries, num_shreds, slot_full)
}); });
if Self::is_replay_result_fatal(&replay_result) { if Self::is_replay_result_fatal(&replay_result) {
@ -743,30 +744,31 @@ impl ReplayStage {
}); });
} }
fn load_blocktree_entries_with_shred_count( fn load_blocktree_entries_with_shred_info(
bank: &Bank, bank: &Bank,
blocktree: &Blocktree, blocktree: &Blocktree,
bank_progress: &mut ForkProgress, bank_progress: &mut ForkProgress,
) -> Result<(Vec<Entry>, usize)> { ) -> Result<(Vec<Entry>, usize, bool)> {
let bank_slot = bank.slot(); blocktree
let entries_and_shred_count = blocktree .get_slot_entries_with_shred_info(bank.slot(), bank_progress.num_shreds as u64)
.get_slot_entries_with_shred_count(bank_slot, bank_progress.num_shreds as u64)?; .map_err(|err| err.into())
Ok(entries_and_shred_count)
} }
fn replay_entries_into_bank( fn replay_entries_into_bank(
bank: &Arc<Bank>, bank: &Arc<Bank>,
entries: Vec<Entry>,
bank_progress: &mut ForkProgress, bank_progress: &mut ForkProgress,
num: usize, entries: Vec<Entry>,
num_shreds: usize,
slot_full: bool,
) -> Result<()> { ) -> Result<()> {
let result = Self::verify_and_process_entries( let result = Self::verify_and_process_entries(
&bank, &bank,
&entries, &entries,
slot_full,
bank_progress.num_shreds, bank_progress.num_shreds,
bank_progress, bank_progress,
); );
bank_progress.num_shreds += num; bank_progress.num_shreds += num_shreds;
bank_progress.num_entries += entries.len(); bank_progress.num_entries += entries.len();
if let Some(last_entry) = entries.last() { if let Some(last_entry) = entries.last() {
bank_progress.last_entry = last_entry.hash; bank_progress.last_entry = last_entry.hash;
@ -778,26 +780,33 @@ impl ReplayStage {
fn verify_ticks( fn verify_ticks(
bank: &Arc<Bank>, bank: &Arc<Bank>,
entries: &[Entry], entries: &[Entry],
slot_full: bool,
tick_hash_count: &mut u64, tick_hash_count: &mut u64,
) -> std::result::Result<(), BlockError> { ) -> std::result::Result<(), BlockError> {
if entries.is_empty() {
return Ok(());
}
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
if !entries.verify_tick_hash_count(tick_hash_count, hashes_per_tick) {
return Err(BlockError::InvalidTickHashCount);
}
let next_bank_tick_height = bank.tick_height() + entries.tick_count(); let next_bank_tick_height = bank.tick_height() + entries.tick_count();
let max_bank_tick_height = bank.max_tick_height(); let max_bank_tick_height = bank.max_tick_height();
if next_bank_tick_height > max_bank_tick_height { if next_bank_tick_height > max_bank_tick_height {
return Err(BlockError::InvalidTickCount); return Err(BlockError::InvalidTickCount);
} }
let has_trailing_entry = !entries.last().unwrap().is_tick(); if next_bank_tick_height < max_bank_tick_height && slot_full {
if next_bank_tick_height == max_bank_tick_height && has_trailing_entry { return Err(BlockError::InvalidTickCount);
return Err(BlockError::TrailingEntry); }
if next_bank_tick_height == max_bank_tick_height {
let has_trailing_entry = !entries.last().unwrap().is_tick();
if has_trailing_entry {
return Err(BlockError::TrailingEntry);
}
if !slot_full {
return Err(BlockError::InvalidLastTick);
}
}
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
if !entries.verify_tick_hash_count(tick_hash_count, hashes_per_tick) {
return Err(BlockError::InvalidTickHashCount);
} }
Ok(()) Ok(())
@ -806,6 +815,7 @@ impl ReplayStage {
fn verify_and_process_entries( fn verify_and_process_entries(
bank: &Arc<Bank>, bank: &Arc<Bank>,
entries: &[Entry], entries: &[Entry],
slot_full: bool,
shred_index: usize, shred_index: usize,
bank_progress: &mut ForkProgress, bank_progress: &mut ForkProgress,
) -> Result<()> { ) -> Result<()> {
@ -813,14 +823,15 @@ impl ReplayStage {
let tick_hash_count = &mut bank_progress.tick_hash_count; let tick_hash_count = &mut bank_progress.tick_hash_count;
let handle_block_error = move |block_error: BlockError| -> Result<()> { let handle_block_error = move |block_error: BlockError| -> Result<()> {
warn!( warn!(
"{:#?}, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}", "{:#?}, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}, slot_full: {}",
block_error, block_error,
bank.slot(), bank.slot(),
entries.len(), entries.len(),
bank.tick_height(), bank.tick_height(),
last_entry, last_entry,
bank.last_blockhash(), bank.last_blockhash(),
shred_index shred_index,
slot_full,
); );
datapoint_error!( datapoint_error!(
@ -832,7 +843,7 @@ impl ReplayStage {
Err(Error::BlockError(block_error)) Err(Error::BlockError(block_error))
}; };
if let Err(block_error) = Self::verify_ticks(bank, entries, tick_hash_count) { if let Err(block_error) = Self::verify_ticks(bank, entries, slot_full, tick_hash_count) {
return handle_block_error(block_error); return handle_block_error(block_error);
} }
@ -1088,6 +1099,7 @@ mod test {
#[test] #[test]
fn test_dead_fork_invalid_slot_tick_count() { fn test_dead_fork_invalid_slot_tick_count() {
// Too many ticks per slot
let res = check_dead_fork(|_keypair, bank| { let res = check_dead_fork(|_keypair, bank| {
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let slot = bank.slot(); let slot = bank.slot();
@ -1105,6 +1117,46 @@ mod test {
} else { } else {
assert!(false); assert!(false);
} }
// Too few ticks per slot
let res = check_dead_fork(|_keypair, bank| {
let blockhash = bank.last_blockhash();
let slot = bank.slot();
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
entries_to_test_shreds(
entry::create_ticks(bank.ticks_per_slot() - 1, hashes_per_tick, blockhash),
slot,
slot.saturating_sub(1),
true,
)
});
if let Err(Error::BlockError(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidTickCount);
} else {
assert!(false);
}
}
#[test]
fn test_dead_fork_invalid_last_tick() {
let res = check_dead_fork(|_keypair, bank| {
let blockhash = bank.last_blockhash();
let slot = bank.slot();
let hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
entries_to_test_shreds(
entry::create_ticks(bank.ticks_per_slot(), hashes_per_tick, blockhash),
slot,
slot.saturating_sub(1),
false,
)
});
if let Err(Error::BlockError(block_error)) = res {
assert_eq!(block_error, BlockError::InvalidLastTick);
} else {
assert!(false);
}
} }
#[test] #[test]
@ -1121,7 +1173,7 @@ mod test {
system_transaction::transfer(&genesis_keypair, &keypair.pubkey(), 2, blockhash); system_transaction::transfer(&genesis_keypair, &keypair.pubkey(), 2, blockhash);
let trailing_entry = entry::next_entry(&last_entry_hash, 1, vec![tx]); let trailing_entry = entry::next_entry(&last_entry_hash, 1, vec![tx]);
entries.push(trailing_entry); entries.push(trailing_entry);
entries_to_test_shreds(entries, slot, slot.saturating_sub(1), false) entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true)
}); });
if let Err(Error::BlockError(block_error)) = res { if let Err(Error::BlockError(block_error)) = res {

View File

@ -3,6 +3,9 @@ pub enum BlockError {
/// Block entries hashes must all be valid /// Block entries hashes must all be valid
InvalidEntryHash, InvalidEntryHash,
/// Blocks must end in a tick that has been marked as the last tick.
InvalidLastTick,
/// Blocks can not have extra ticks or missing ticks /// Blocks can not have extra ticks or missing ticks
InvalidTickCount, InvalidTickCount,

View File

@ -1073,19 +1073,21 @@ impl Blocktree {
shred_start_index: u64, shred_start_index: u64,
_max_entries: Option<u64>, _max_entries: Option<u64>,
) -> Result<Vec<Entry>> { ) -> Result<Vec<Entry>> {
self.get_slot_entries_with_shred_count(slot, shred_start_index) self.get_slot_entries_with_shred_info(slot, shred_start_index)
.map(|x| x.0) .map(|x| x.0)
} }
pub fn get_slot_entries_with_shred_count( /// Returns the entry vector for the slot starting with `shred_start_index`, the number of
/// shreds that comprise the entry vector, and whether the slot is full (consumed all shreds).
pub fn get_slot_entries_with_shred_info(
&self, &self,
slot: Slot, slot: Slot,
start_index: u64, start_index: u64,
) -> Result<(Vec<Entry>, usize)> { ) -> Result<(Vec<Entry>, usize, bool)> {
let slot_meta_cf = self.db.column::<cf::SlotMeta>(); let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = slot_meta_cf.get(slot)?; let slot_meta = slot_meta_cf.get(slot)?;
if slot_meta.is_none() { if slot_meta.is_none() {
return Ok((vec![], 0)); return Ok((vec![], 0, false));
} }
let slot_meta = slot_meta.unwrap(); let slot_meta = slot_meta.unwrap();
@ -1096,13 +1098,14 @@ impl Blocktree {
slot_meta.consumed as u32, slot_meta.consumed as u32,
); );
if completed_ranges.is_empty() { if completed_ranges.is_empty() {
return Ok((vec![], 0)); return Ok((vec![], 0, false));
} }
let num_shreds = completed_ranges let num_shreds = completed_ranges
.last() .last()
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1); .map(|(_, end_index)| u64::from(*end_index) - start_index + 1)
.unwrap_or(0) as usize;
let all_entries: Result<Vec<Vec<Entry>>> = PAR_THREAD_POOL.with(|thread_pool| { let entries: Result<Vec<Vec<Entry>>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| { thread_pool.borrow().install(|| {
completed_ranges completed_ranges
.par_iter() .par_iter()
@ -1113,8 +1116,8 @@ impl Blocktree {
}) })
}); });
let all_entries: Vec<Entry> = all_entries?.into_iter().flatten().collect(); let entries: Vec<Entry> = entries?.into_iter().flatten().collect();
Ok((all_entries, num_shreds.unwrap_or(0) as usize)) Ok((entries, num_shreds, slot_meta.is_full()))
} }
// Get the range of indexes [start_index, end_index] of every completed data block // Get the range of indexes [start_index, end_index] of every completed data block