From 03d29a8311dc89e093fa8d0d1d46a4c03a465b46 Mon Sep 17 00:00:00 2001 From: sakridge Date: Wed, 23 Oct 2019 12:11:04 -0700 Subject: [PATCH] Async poh verify (#6353) * Async poh verify * Up ticks_per_s to 160 GPU poh verify needs shorter poh sequences or it takes forever to verify. Keep slot time the same at 400ms. * Fix stats * Don't halt on ticks * Increase retries for local_cluster tests and make repairman test serial --- core/src/broadcast_stage/broadcast_utils.rs | 29 +++-- core/src/poh_recorder.rs | 2 +- core/src/replay_stage.rs | 46 +++---- core/src/sigverify.rs | 1 - ledger/src/blocktree_processor.rs | 11 +- ledger/src/entry.rs | 125 +++++++++++++------- local_cluster/src/cluster_tests.rs | 2 +- local_cluster/src/local_cluster.rs | 6 +- local_cluster/src/tests/local_cluster.rs | 1 + sdk/src/clock.rs | 6 +- 10 files changed, 142 insertions(+), 87 deletions(-) diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index d4df5be5e..fd7c24f0d 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -115,11 +115,15 @@ mod tests { }) .collect(); - let result = recv_slot_entries(&r).unwrap(); - - assert_eq!(result.bank.slot(), bank1.slot()); - assert_eq!(result.last_tick_height, bank1.max_tick_height()); - assert_eq!(result.entries, entries); + let mut res_entries = vec![]; + let mut last_tick_height = 0; + while let Ok(result) = recv_slot_entries(&r) { + assert_eq!(result.bank.slot(), bank1.slot()); + last_tick_height = result.last_tick_height; + res_entries.extend(result.entries); + } + assert_eq!(last_tick_height, bank1.max_tick_height()); + assert_eq!(res_entries, entries); } #[test] @@ -152,9 +156,16 @@ mod tests { .unwrap() .unwrap(); - let result = recv_slot_entries(&r).unwrap(); - assert_eq!(result.bank.slot(), bank2.slot()); - assert_eq!(result.last_tick_height, expected_last_height); - assert_eq!(result.entries, vec![last_entry]); + let mut res_entries = vec![]; + let mut last_tick_height = 0; + let mut bank_slot = 0; + while let Ok(result) = recv_slot_entries(&r) { + bank_slot = result.bank.slot(); + last_tick_height = result.last_tick_height; + res_entries = result.entries; + } + assert_eq!(bank_slot, bank2.slot()); + assert_eq!(last_tick_height, expected_last_height); + assert_eq!(res_entries, vec![last_entry]); } } diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index efc4a5605..516b86b73 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -963,7 +963,7 @@ mod tests { poh_recorder.reset(hash(b"hello"), 0, Some((4, 4))); // parent slot 0 implies tick_height of 3 assert_eq!(poh_recorder.tick_cache.len(), 0); poh_recorder.tick(); - assert_eq!(poh_recorder.tick_height, 5); + assert_eq!(poh_recorder.tick_height, DEFAULT_TICKS_PER_SLOT + 1); } Blocktree::destroy(&ledger_path).unwrap(); } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d8910d0c5..a9fdd35e3 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -16,6 +16,7 @@ use solana_ledger::blocktree_processor; use solana_ledger::entry::{Entry, EntrySlice}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::snapshot_package::SnapshotPackageSender; +use solana_measure::measure::Measure; use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; @@ -764,13 +765,17 @@ impl ReplayStage { shred_index: usize, bank_progress: &mut ForkProgress, ) -> Result<()> { - let now = Instant::now(); - let last_entry = &bank_progress.last_entry; datapoint_info!("verify-batch-size", ("size", entries.len() as i64, i64)); - let verify_result = entries.verify(last_entry); - let verify_entries_elapsed = now.elapsed().as_micros(); - bank_progress.stats.entry_verification_elapsed += verify_entries_elapsed as u64; - if !verify_result { + let mut verify_total = Measure::start("verify_and_process_entries"); + let last_entry = &bank_progress.last_entry; + let mut entry_state = entries.start_verify(last_entry); + + let mut replay_elapsed = Measure::start("replay_elapsed"); + let res = blocktree_processor::process_entries(bank, entries, true); + replay_elapsed.stop(); + bank_progress.stats.replay_elapsed += replay_elapsed.as_us(); + + if !entry_state.finish_verify(entries) { info!( "entry verification failed, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}", bank.slot(), @@ -788,11 +793,9 @@ impl ReplayStage { ); return Err(Error::BlobError(BlobError::VerificationFailed)); } - - let now = Instant::now(); - let res = blocktree_processor::process_entries(bank, entries, true); - let replay_elapsed = now.elapsed().as_micros(); - bank_progress.stats.replay_elapsed += replay_elapsed as u64; + verify_total.stop(); + bank_progress.stats.entry_verification_elapsed = + verify_total.as_us() - replay_elapsed.as_us(); res?; Ok(()) @@ -948,7 +951,7 @@ mod test { let missing_keypair = Keypair::new(); let missing_keypair2 = Keypair::new(); - let res = check_dead_fork(|blockhash, slot| { + let res = check_dead_fork(|_keypair, blockhash, slot| { let entry = entry::next_entry( blockhash, 1, @@ -973,16 +976,15 @@ mod test { #[test] fn test_dead_fork_entry_verification_failure() { - let keypair1 = Keypair::new(); let keypair2 = Keypair::new(); - let res = check_dead_fork(|blockhash, slot| { + let res = check_dead_fork(|genesis_keypair, blockhash, slot| { let bad_hash = hash(&[2; 30]); let entry = entry::next_entry( - // User wrong blockhash so that the entry causes an entry verification failure + // Use wrong blockhash so that the entry causes an entry verification failure &bad_hash, 1, vec![system_transaction::transfer_now( - &keypair1, + &genesis_keypair, &keypair2.pubkey(), 2, *blockhash, @@ -997,7 +999,7 @@ mod test { #[test] fn test_dead_fork_entry_deserialize_failure() { // Insert entry that causes deserialization failure - let res = check_dead_fork(|_, _| { + let res = check_dead_fork(|_, _, _| { let payload_len = SIZE_OF_DATA_SHRED_PAYLOAD; let gibberish = [0xa5u8; PACKET_DATA_SIZE]; let mut data_header = DataShredHeader::default(); @@ -1025,19 +1027,23 @@ mod test { // marked as dead. Returns the error for caller to verify. fn check_dead_fork(shred_to_insert: F) -> Result<()> where - F: Fn(&Hash, u64) -> Vec, + F: Fn(&Keypair, &Hash, u64) -> Vec, { let ledger_path = get_tmp_ledger_path!(); let res = { let blocktree = Arc::new( Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), ); - let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000); + let GenesisBlockInfo { + genesis_block, + mint_keypair, + .. + } = create_genesis_block(1000); let bank0 = Arc::new(Bank::new(&genesis_block)); let mut progress = HashMap::new(); let last_blockhash = bank0.last_blockhash(); progress.insert(bank0.slot(), ForkProgress::new(0, last_blockhash)); - let shreds = shred_to_insert(&last_blockhash, bank0.slot()); + let shreds = shred_to_insert(&mint_keypair, &last_blockhash, bank0.slot()); blocktree.insert_shreds(shreds, None).unwrap(); let (res, _tx_count) = ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress); diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index d05aaa54b..3a7751784 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -20,7 +20,6 @@ use solana_sdk::transaction::Transaction; use std::mem::size_of; use solana_rayon_threadlimit::get_thread_count; -pub const NUM_THREADS: u32 = 10; use std::cell::RefCell; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() diff --git a/ledger/src/blocktree_processor.rs b/ledger/src/blocktree_processor.rs index 706ab0d9f..6b97ac50d 100644 --- a/ledger/src/blocktree_processor.rs +++ b/ledger/src/blocktree_processor.rs @@ -21,7 +21,6 @@ use std::result; use std::sync::Arc; use std::time::{Duration, Instant}; -pub const NUM_THREADS: u32 = 10; use solana_rayon_threadlimit::get_thread_count; use std::cell::RefCell; @@ -109,12 +108,11 @@ fn process_entries_with_callback( ) -> Result<()> { // accumulator for entries that can be processed in parallel let mut batches = vec![]; + let mut tick_hashes = vec![]; for entry in entries { if entry.is_tick() { - // if its a tick, execute the group and register the tick - execute_batches(bank, &batches, entry_callback)?; - batches.clear(); - bank.register_tick(&entry.hash); + // if its a tick, save it for later + tick_hashes.push(entry.hash); continue; } // else loop on processing the entry @@ -164,6 +162,9 @@ fn process_entries_with_callback( } } execute_batches(bank, &batches, entry_callback)?; + for hash in tick_hashes { + bank.register_tick(&hash); + } Ok(()) } diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 9ea153daf..135b6a6aa 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -8,6 +8,7 @@ use log::*; use rayon::prelude::*; use rayon::ThreadPool; use serde::{Deserialize, Serialize}; +use solana_measure::measure::Measure; use solana_merkle_tree::MerkleTree; use solana_metrics::*; use solana_rayon_threadlimit::get_thread_count; @@ -18,10 +19,9 @@ use std::cell::RefCell; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; +use std::thread::JoinHandle; use std::time::Instant; -pub const NUM_THREADS: u32 = 10; - thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) .build() @@ -162,15 +162,70 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction } } +pub struct EntryVerifyState { + thread_h: Option>, + hashes: Option>>>, + verified: bool, + tx_hashes: Vec>, + start_time_ms: u64, +} + +impl EntryVerifyState { + pub fn finish_verify(&mut self, entries: &[Entry]) -> bool { + if self.hashes.is_some() { + let gpu_time_ms = self.thread_h.take().unwrap().join().unwrap(); + + let mut verify_check_time = Measure::start("verify_check"); + let hashes = self.hashes.take().expect("hashes.as_ref"); + let hashes = Arc::try_unwrap(hashes) + .expect("unwrap Arc") + .into_inner() + .expect("into_inner"); + let res = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + hashes + .into_par_iter() + .zip(&self.tx_hashes) + .zip(entries) + .all(|((hash, tx_hash), answer)| { + if answer.num_hashes == 0 { + hash == answer.hash + } else { + let mut poh = Poh::new(hash, None); + if let Some(mixin) = tx_hash { + poh.record(*mixin).unwrap().hash == answer.hash + } else { + poh.tick().unwrap().hash == answer.hash + } + } + }) + }) + }); + verify_check_time.stop(); + inc_new_counter_warn!( + "entry_verify-duration", + (gpu_time_ms + verify_check_time.as_ms() + self.start_time_ms) as usize + ); + res + } else { + self.verified + } + } +} + // an EntrySlice is a slice of Entries pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. - fn verify_cpu(&self, start_hash: &Hash) -> bool; + fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState; + fn start_verify(&self, start_hash: &Hash) -> EntryVerifyState; fn verify(&self, start_hash: &Hash) -> bool; } impl EntrySlice for [Entry] { - fn verify_cpu(&self, start_hash: &Hash) -> bool { + fn verify(&self, start_hash: &Hash) -> bool { + self.start_verify(start_hash).finish_verify(self) + } + fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState { let now = Instant::now(); let genesis = [Entry { num_hashes: 0, @@ -198,10 +253,16 @@ impl EntrySlice for [Entry] { "entry_verify-duration", timing::duration_as_ms(&now.elapsed()) as usize ); - res + EntryVerifyState { + thread_h: None, + verified: res, + hashes: None, + tx_hashes: vec![], + start_time_ms: 0, + } } - fn verify(&self, start_hash: &Hash) -> bool { + fn start_verify(&self, start_hash: &Hash) -> EntryVerifyState { let api = perf_libs::api(); if api.is_none() { return self.verify_cpu(start_hash); @@ -209,11 +270,6 @@ impl EntrySlice for [Entry] { let api = api.unwrap(); inc_new_counter_warn!("entry_verify-num_entries", self.len() as usize); - // Use CPU verify if the batch length is < 1K - if self.len() < 1024 { - return self.verify_cpu(start_hash); - } - let start = Instant::now(); let genesis = [Entry { @@ -238,9 +294,9 @@ impl EntrySlice for [Entry] { let hashes = Arc::new(Mutex::new(hashes)); let hashes_clone = hashes.clone(); - let gpu_wait = Instant::now(); let gpu_verify_thread = thread::spawn(move || { let mut hashes = hashes_clone.lock().unwrap(); + let gpu_wait = Instant::now(); let res; unsafe { res = (api.poh_verify_many)( @@ -253,9 +309,14 @@ impl EntrySlice for [Entry] { if res != 0 { panic!("GPU PoH verify many failed"); } + inc_new_counter_warn!( + "entry_verify-gpu_thread", + timing::duration_as_ms(&gpu_wait.elapsed()) as usize + ); + timing::duration_as_ms(&gpu_wait.elapsed()) }); - let tx_hashes: Vec> = PAR_THREAD_POOL.with(|thread_pool| { + let tx_hashes = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { self.into_par_iter() .map(|entry| { @@ -269,37 +330,13 @@ impl EntrySlice for [Entry] { }) }); - gpu_verify_thread.join().unwrap(); - inc_new_counter_warn!( - "entry_verify-gpu_thread", - timing::duration_as_ms(&gpu_wait.elapsed()) as usize - ); - - let hashes = Arc::try_unwrap(hashes).unwrap().into_inner().unwrap(); - let res = - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - hashes.into_par_iter().zip(tx_hashes).zip(self).all( - |((hash, tx_hash), answer)| { - if answer.num_hashes == 0 { - hash == answer.hash - } else { - let mut poh = Poh::new(hash, None); - if let Some(mixin) = tx_hash { - poh.record(mixin).unwrap().hash == answer.hash - } else { - poh.tick().unwrap().hash == answer.hash - } - } - }, - ) - }) - }); - inc_new_counter_warn!( - "entry_verify-duration", - timing::duration_as_ms(&start.elapsed()) as usize - ); - res + EntryVerifyState { + thread_h: Some(gpu_verify_thread), + verified: false, + tx_hashes, + start_time_ms: timing::duration_as_ms(&start.elapsed()), + hashes: Some(hashes), + } } } diff --git a/local_cluster/src/cluster_tests.rs b/local_cluster/src/cluster_tests.rs index 5319110b5..bdda5a1db 100644 --- a/local_cluster/src/cluster_tests.rs +++ b/local_cluster/src/cluster_tests.rs @@ -57,7 +57,7 @@ pub fn spend_and_verify_all_nodes( system_transaction::transfer(&funding_keypair, &random_keypair.pubkey(), 1, blockhash); let confs = VOTE_THRESHOLD_DEPTH + 1; let sig = client - .retry_transfer_until_confirmed(&funding_keypair, &mut transaction, 5, confs) + .retry_transfer_until_confirmed(&funding_keypair, &mut transaction, 10, confs) .unwrap(); for validator in &cluster_nodes { if ignore_nodes.contains(&validator.id) { diff --git a/local_cluster/src/local_cluster.rs b/local_cluster/src/local_cluster.rs index fc411d086..0d6c2274e 100644 --- a/local_cluster/src/local_cluster.rs +++ b/local_cluster/src/local_cluster.rs @@ -434,7 +434,7 @@ impl LocalCluster { *dest_pubkey ); client - .retry_transfer(&source_keypair, &mut tx, 5) + .retry_transfer(&source_keypair, &mut tx, 10) .expect("client transfer"); client .wait_for_balance(dest_pubkey, Some(lamports)) @@ -472,7 +472,7 @@ impl LocalCluster { client.get_recent_blockhash().unwrap().0, ); client - .retry_transfer(&from_account, &mut transaction, 5) + .retry_transfer(&from_account, &mut transaction, 10) .expect("fund vote"); client .wait_for_balance(&vote_account_pubkey, Some(amount)) @@ -571,7 +571,7 @@ impl LocalCluster { let blockhash = client.get_recent_blockhash().unwrap().0; let mut transaction = Transaction::new(&signer_keys, message, blockhash); client - .retry_transfer(&from_keypair, &mut transaction, 5) + .retry_transfer(&from_keypair, &mut transaction, 10) .map(|_signature| ()) } } diff --git a/local_cluster/src/tests/local_cluster.rs b/local_cluster/src/tests/local_cluster.rs index 8d772674e..c571854d4 100644 --- a/local_cluster/src/tests/local_cluster.rs +++ b/local_cluster/src/tests/local_cluster.rs @@ -624,6 +624,7 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) { } #[test] +#[serial] fn test_repairman_catchup() { solana_logger::setup(); error!("test_repairman_catchup"); diff --git a/sdk/src/clock.rs b/sdk/src/clock.rs index e8db231d5..5089b956d 100644 --- a/sdk/src/clock.rs +++ b/sdk/src/clock.rs @@ -2,11 +2,11 @@ // The default tick rate that the cluster attempts to achieve. Note that the actual tick // rate at any given time should be expected to drift -pub const DEFAULT_TICKS_PER_SECOND: u64 = 10; +pub const DEFAULT_TICKS_PER_SECOND: u64 = 160; -// At 10 ticks/s, 4 ticks per slot implies that leader rotation and voting will happen +// At 160 ticks/s, 64 ticks per slot implies that leader rotation and voting will happen // every 400 ms. A fast voting cadence ensures faster finality and convergence -pub const DEFAULT_TICKS_PER_SLOT: u64 = 4; +pub const DEFAULT_TICKS_PER_SLOT: u64 = 64; // 1 Epoch = 400 * 8192 ms ~= 55 minutes pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 8192;