From 727be309b2d5cb9e21b91fa680f7c9c4a32e85ed Mon Sep 17 00:00:00 2001 From: Parth Date: Tue, 24 Dec 2019 12:56:27 +0530 Subject: [PATCH] fix entryverification state (#7169) automerge --- core/src/banking_stage.rs | 4 +- core/src/replay_stage.rs | 5 + ledger/src/entry.rs | 174 ++++++++++++++++++----------- local-cluster/src/cluster_tests.rs | 2 +- 4 files changed, 118 insertions(+), 67 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1f22b2d14..1fe91cc95 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1118,7 +1118,7 @@ mod tests { .collect(); trace!("done"); assert_eq!(entries.len(), genesis_config.ticks_per_slot as usize); - assert!(entries.verify(&start_hash)); + assert_eq!(entries.verify(&start_hash), true); assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); banking_stage.join().unwrap(); } @@ -1220,7 +1220,7 @@ mod tests { .map(|(_bank, (entry, _tick_height))| entry) .collect(); - assert!(entries.verify(&blockhash)); + assert_eq!(entries.verify(&blockhash), true); if !entries.is_empty() { blockhash = entries.last().unwrap().hash; for entry in entries { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 6668de57e..84a1657b8 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -9,6 +9,7 @@ use crate::{ rpc_subscriptions::RpcSubscriptions, thread_mem_usage, }; +use solana_ledger::entry::EntryVerificationStatus; use solana_ledger::{ bank_forks::BankForks, block_error::BlockError, @@ -1071,6 +1072,10 @@ impl ReplayStage { let mut verify_total = Measure::start("verify_and_process_entries"); let mut entry_state = entries.start_verify(last_entry, recyclers.clone()); + if entry_state.status() == EntryVerificationStatus::Failure { + return handle_block_error(BlockError::InvalidEntryHash); + } + let mut replay_elapsed = Measure::start("replay_elapsed"); let res = blocktree_processor::process_entries(bank, entries, true, transaction_status_sender); diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 86c3cf4c2..a91d16bb0 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -18,12 +18,11 @@ use solana_sdk::hash::Hash; use solana_sdk::timing; use solana_sdk::transaction::Transaction; use std::cell::RefCell; -use std::cmp; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex}; -use std::thread; use std::thread::JoinHandle; use std::time::Instant; +use std::{cmp, thread}; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) @@ -151,10 +150,10 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction } } -pub struct EntryVerifyState { +pub struct VerificationData { thread_h: Option>, + verification_status: EntryVerificationStatus, hashes: Option>>>, - verified: bool, tx_hashes: Vec>, start_time_ms: u64, } @@ -165,46 +164,75 @@ pub struct VerifyRecyclers { tick_count_recycler: Recycler>, } -impl EntryVerifyState { +#[derive(PartialEq, Clone, Copy, Debug)] +pub enum EntryVerificationStatus { + Failure, + Success, + Pending, +} + +pub enum EntryVerificationState { + CPU(VerificationData), + GPU(VerificationData), +} + +impl EntryVerificationState { + pub fn status(&self) -> EntryVerificationStatus { + match self { + EntryVerificationState::CPU(state) => state.verification_status, + EntryVerificationState::GPU(state) => state.verification_status, + } + } + pub fn finish_verify(&mut self, entries: &[Entry]) -> bool { - if self.hashes.is_some() { - let gpu_time_ms = self.thread_h.take().unwrap().join().unwrap(); + match self { + EntryVerificationState::GPU(verification_state) => { + let gpu_time_ms = verification_state.thread_h.take().unwrap().join().unwrap(); - let mut verify_check_time = Measure::start("verify_check"); - let hashes = self.hashes.take().expect("hashes.as_ref"); - let hashes = Arc::try_unwrap(hashes) - .expect("unwrap Arc") - .into_inner() - .expect("into_inner"); - let res = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - hashes - .into_par_iter() - .zip(&self.tx_hashes) - .zip(entries) - .all(|((hash, tx_hash), answer)| { - if answer.num_hashes == 0 { - *hash == answer.hash - } else { - let mut poh = Poh::new(*hash, None); - if let Some(mixin) = tx_hash { - poh.record(*mixin).unwrap().hash == answer.hash + let mut verify_check_time = Measure::start("verify_check"); + let hashes = verification_state.hashes.take().expect("hashes.as_ref"); + let hashes = Arc::try_unwrap(hashes) + .expect("unwrap Arc") + .into_inner() + .expect("into_inner"); + let res = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + hashes + .into_par_iter() + .zip(&verification_state.tx_hashes) + .zip(entries) + .all(|((hash, tx_hash), answer)| { + if answer.num_hashes == 0 { + *hash == answer.hash } else { - poh.tick().unwrap().hash == answer.hash + let mut poh = Poh::new(*hash, None); + if let Some(mixin) = tx_hash { + poh.record(*mixin).unwrap().hash == answer.hash + } else { + poh.tick().unwrap().hash == answer.hash + } } - } - }) - }) - }); + }) + }) + }); - verify_check_time.stop(); - inc_new_counter_warn!( - "entry_verify-duration", - (gpu_time_ms + verify_check_time.as_ms() + self.start_time_ms) as usize - ); - res - } else { - self.verified + verify_check_time.stop(); + inc_new_counter_warn!( + "entry_verify-duration", + (gpu_time_ms + verify_check_time.as_ms() + verification_state.start_time_ms) + as usize + ); + + verification_state.verification_status = if res { + EntryVerificationStatus::Success + } else { + EntryVerificationStatus::Failure + }; + res + } + EntryVerificationState::CPU(verification_state) => { + verification_state.verification_status == EntryVerificationStatus::Success + } } } } @@ -212,8 +240,9 @@ impl EntryVerifyState { // an EntrySlice is a slice of Entries pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState; - fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) -> EntryVerifyState; + fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState; + fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) + -> EntryVerificationState; fn verify(&self, start_hash: &Hash) -> bool; /// Checks that each entry tick has the correct number of hashes. Entry slices do not /// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count @@ -228,7 +257,7 @@ impl EntrySlice for [Entry] { self.start_verify(start_hash, VerifyRecyclers::default()) .finish_verify(self) } - fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState { + fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState { let now = Instant::now(); let genesis = [Entry { num_hashes: 0, @@ -256,16 +285,24 @@ impl EntrySlice for [Entry] { "entry_verify-duration", timing::duration_as_ms(&now.elapsed()) as usize ); - EntryVerifyState { + EntryVerificationState::CPU(VerificationData { thread_h: None, - verified: res, + verification_status: if res { + EntryVerificationStatus::Success + } else { + EntryVerificationStatus::Failure + }, hashes: None, tx_hashes: vec![], start_time_ms: 0, - } + }) } - fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) -> EntryVerifyState { + fn start_verify( + &self, + start_hash: &Hash, + recyclers: VerifyRecyclers, + ) -> EntryVerificationState { let api = perf_libs::api(); if api.is_none() { return self.verify_cpu(start_hash); @@ -341,13 +378,13 @@ impl EntrySlice for [Entry] { }) }); - EntryVerifyState { + EntryVerificationState::GPU(VerificationData { thread_h: Some(gpu_verify_thread), - verified: false, + verification_status: EntryVerificationStatus::Pending, tx_hashes, start_time_ms: timing::duration_as_ms(&start.elapsed()), hashes: Some(hashes), - } + }) } fn verify_tick_hash_count(&self, tick_hash_count: &mut u64, hashes_per_tick: u64) -> bool { @@ -515,14 +552,17 @@ mod tests { solana_logger::setup(); let zero = Hash::default(); let one = hash(&zero.as_ref()); - assert!(vec![][..].verify(&zero)); // base case - assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1 - assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad - assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero)); // inductive step + assert_eq!(vec![][..].verify(&zero), true); // base case + assert_eq!(vec![Entry::new_tick(0, &zero)][..].verify(&zero), true); // singleton case 1 + assert_eq!(vec![Entry::new_tick(0, &zero)][..].verify(&one), false); // singleton case 2, bad + assert_eq!( + vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero), + true + ); // inductive step let mut bad_ticks = vec![next_entry(&zero, 0, vec![]); 2]; bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&zero)); // inductive step, bad + assert_eq!(bad_ticks.verify(&zero), false); // inductive step, bad } #[test] @@ -531,18 +571,18 @@ mod tests { let zero = Hash::default(); let one = hash(&zero.as_ref()); let two = hash(&one.as_ref()); - assert!(vec![][..].verify(&one)); // base case - assert!(vec![Entry::new_tick(1, &two)][..].verify(&one)); // singleton case 1 - assert!(!vec![Entry::new_tick(1, &two)][..].verify(&two)); // singleton case 2, bad + assert_eq!(vec![][..].verify(&one), true); // base case + assert_eq!(vec![Entry::new_tick(1, &two)][..].verify(&one), true); // singleton case 1 + assert_eq!(vec![Entry::new_tick(1, &two)][..].verify(&two), false); // singleton case 2, bad let mut ticks = vec![next_entry(&one, 1, vec![])]; ticks.push(next_entry(&ticks.last().unwrap().hash, 1, vec![])); - assert!(ticks.verify(&one)); // inductive step + assert_eq!(ticks.verify(&one), true); // inductive step let mut bad_ticks = vec![next_entry(&one, 1, vec![])]; bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![])); bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&one)); // inductive step, bad + assert_eq!(bad_ticks.verify(&one), false); // inductive step, bad } #[test] @@ -554,9 +594,15 @@ mod tests { let alice_pubkey = Keypair::default(); let tx0 = create_sample_payment(&alice_pubkey, one); let tx1 = create_sample_timestamp(&alice_pubkey, one); - assert!(vec![][..].verify(&one)); // base case - assert!(vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one)); // singleton case 1 - assert!(!vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two)); // singleton case 2, bad + assert_eq!(vec![][..].verify(&one), true); // base case + assert_eq!( + vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&one), + true + ); // singleton case 1 + assert_eq!( + vec![next_entry(&one, 1, vec![tx0.clone()])][..].verify(&two), + false + ); // singleton case 2, bad let mut ticks = vec![next_entry(&one, 1, vec![tx0.clone()])]; ticks.push(next_entry( @@ -564,12 +610,12 @@ mod tests { 1, vec![tx1.clone()], )); - assert!(ticks.verify(&one)); // inductive step + assert_eq!(ticks.verify(&one), true); // inductive step let mut bad_ticks = vec![next_entry(&one, 1, vec![tx0])]; bad_ticks.push(next_entry(&bad_ticks.last().unwrap().hash, 1, vec![tx1])); bad_ticks[1].hash = one; - assert!(!bad_ticks.verify(&one)); // inductive step, bad + assert_eq!(bad_ticks.verify(&one), false); // inductive step, bad } #[test] diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 7ad416271..d301f76ef 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -303,7 +303,7 @@ fn poll_all_nodes_for_signature( fn get_and_verify_slot_entries(blocktree: &Blocktree, slot: Slot, last_entry: &Hash) -> Vec { let entries = blocktree.get_slot_entries(slot, 0, None).unwrap(); - assert!(entries.verify(last_entry)); + assert_eq!(entries.verify(last_entry), true); entries }