Async poh verify (#6353)
* Async poh verify * Up ticks_per_s to 160 GPU poh verify needs shorter poh sequences or it takes forever to verify. Keep slot time the same at 400ms. * Fix stats * Don't halt on ticks * Increase retries for local_cluster tests and make repairman test serial
This commit is contained in:
parent
35cc74ef25
commit
03d29a8311
|
@ -115,11 +115,15 @@ mod tests {
|
|||
})
|
||||
.collect();
|
||||
|
||||
let result = recv_slot_entries(&r).unwrap();
|
||||
|
||||
assert_eq!(result.bank.slot(), bank1.slot());
|
||||
assert_eq!(result.last_tick_height, bank1.max_tick_height());
|
||||
assert_eq!(result.entries, entries);
|
||||
let mut res_entries = vec![];
|
||||
let mut last_tick_height = 0;
|
||||
while let Ok(result) = recv_slot_entries(&r) {
|
||||
assert_eq!(result.bank.slot(), bank1.slot());
|
||||
last_tick_height = result.last_tick_height;
|
||||
res_entries.extend(result.entries);
|
||||
}
|
||||
assert_eq!(last_tick_height, bank1.max_tick_height());
|
||||
assert_eq!(res_entries, entries);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -152,9 +156,16 @@ mod tests {
|
|||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let result = recv_slot_entries(&r).unwrap();
|
||||
assert_eq!(result.bank.slot(), bank2.slot());
|
||||
assert_eq!(result.last_tick_height, expected_last_height);
|
||||
assert_eq!(result.entries, vec![last_entry]);
|
||||
let mut res_entries = vec![];
|
||||
let mut last_tick_height = 0;
|
||||
let mut bank_slot = 0;
|
||||
while let Ok(result) = recv_slot_entries(&r) {
|
||||
bank_slot = result.bank.slot();
|
||||
last_tick_height = result.last_tick_height;
|
||||
res_entries = result.entries;
|
||||
}
|
||||
assert_eq!(bank_slot, bank2.slot());
|
||||
assert_eq!(last_tick_height, expected_last_height);
|
||||
assert_eq!(res_entries, vec![last_entry]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -963,7 +963,7 @@ mod tests {
|
|||
poh_recorder.reset(hash(b"hello"), 0, Some((4, 4))); // parent slot 0 implies tick_height of 3
|
||||
assert_eq!(poh_recorder.tick_cache.len(), 0);
|
||||
poh_recorder.tick();
|
||||
assert_eq!(poh_recorder.tick_height, 5);
|
||||
assert_eq!(poh_recorder.tick_height, DEFAULT_TICKS_PER_SLOT + 1);
|
||||
}
|
||||
Blocktree::destroy(&ledger_path).unwrap();
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ use solana_ledger::blocktree_processor;
|
|||
use solana_ledger::entry::{Entry, EntrySlice};
|
||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||
use solana_ledger::snapshot_package::SnapshotPackageSender;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::{datapoint_warn, inc_new_counter_info};
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::hash::Hash;
|
||||
|
@ -764,13 +765,17 @@ impl ReplayStage {
|
|||
shred_index: usize,
|
||||
bank_progress: &mut ForkProgress,
|
||||
) -> Result<()> {
|
||||
let now = Instant::now();
|
||||
let last_entry = &bank_progress.last_entry;
|
||||
datapoint_info!("verify-batch-size", ("size", entries.len() as i64, i64));
|
||||
let verify_result = entries.verify(last_entry);
|
||||
let verify_entries_elapsed = now.elapsed().as_micros();
|
||||
bank_progress.stats.entry_verification_elapsed += verify_entries_elapsed as u64;
|
||||
if !verify_result {
|
||||
let mut verify_total = Measure::start("verify_and_process_entries");
|
||||
let last_entry = &bank_progress.last_entry;
|
||||
let mut entry_state = entries.start_verify(last_entry);
|
||||
|
||||
let mut replay_elapsed = Measure::start("replay_elapsed");
|
||||
let res = blocktree_processor::process_entries(bank, entries, true);
|
||||
replay_elapsed.stop();
|
||||
bank_progress.stats.replay_elapsed += replay_elapsed.as_us();
|
||||
|
||||
if !entry_state.finish_verify(entries) {
|
||||
info!(
|
||||
"entry verification failed, slot: {}, entry len: {}, tick_height: {}, last entry: {}, last_blockhash: {}, shred_index: {}",
|
||||
bank.slot(),
|
||||
|
@ -788,11 +793,9 @@ impl ReplayStage {
|
|||
);
|
||||
return Err(Error::BlobError(BlobError::VerificationFailed));
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
let res = blocktree_processor::process_entries(bank, entries, true);
|
||||
let replay_elapsed = now.elapsed().as_micros();
|
||||
bank_progress.stats.replay_elapsed += replay_elapsed as u64;
|
||||
verify_total.stop();
|
||||
bank_progress.stats.entry_verification_elapsed =
|
||||
verify_total.as_us() - replay_elapsed.as_us();
|
||||
|
||||
res?;
|
||||
Ok(())
|
||||
|
@ -948,7 +951,7 @@ mod test {
|
|||
let missing_keypair = Keypair::new();
|
||||
let missing_keypair2 = Keypair::new();
|
||||
|
||||
let res = check_dead_fork(|blockhash, slot| {
|
||||
let res = check_dead_fork(|_keypair, blockhash, slot| {
|
||||
let entry = entry::next_entry(
|
||||
blockhash,
|
||||
1,
|
||||
|
@ -973,16 +976,15 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn test_dead_fork_entry_verification_failure() {
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let res = check_dead_fork(|blockhash, slot| {
|
||||
let res = check_dead_fork(|genesis_keypair, blockhash, slot| {
|
||||
let bad_hash = hash(&[2; 30]);
|
||||
let entry = entry::next_entry(
|
||||
// User wrong blockhash so that the entry causes an entry verification failure
|
||||
// Use wrong blockhash so that the entry causes an entry verification failure
|
||||
&bad_hash,
|
||||
1,
|
||||
vec![system_transaction::transfer_now(
|
||||
&keypair1,
|
||||
&genesis_keypair,
|
||||
&keypair2.pubkey(),
|
||||
2,
|
||||
*blockhash,
|
||||
|
@ -997,7 +999,7 @@ mod test {
|
|||
#[test]
|
||||
fn test_dead_fork_entry_deserialize_failure() {
|
||||
// Insert entry that causes deserialization failure
|
||||
let res = check_dead_fork(|_, _| {
|
||||
let res = check_dead_fork(|_, _, _| {
|
||||
let payload_len = SIZE_OF_DATA_SHRED_PAYLOAD;
|
||||
let gibberish = [0xa5u8; PACKET_DATA_SIZE];
|
||||
let mut data_header = DataShredHeader::default();
|
||||
|
@ -1025,19 +1027,23 @@ mod test {
|
|||
// marked as dead. Returns the error for caller to verify.
|
||||
fn check_dead_fork<F>(shred_to_insert: F) -> Result<()>
|
||||
where
|
||||
F: Fn(&Hash, u64) -> Vec<Shred>,
|
||||
F: Fn(&Keypair, &Hash, u64) -> Vec<Shred>,
|
||||
{
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
let res = {
|
||||
let blocktree = Arc::new(
|
||||
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
|
||||
);
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
||||
let GenesisBlockInfo {
|
||||
genesis_block,
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank0 = Arc::new(Bank::new(&genesis_block));
|
||||
let mut progress = HashMap::new();
|
||||
let last_blockhash = bank0.last_blockhash();
|
||||
progress.insert(bank0.slot(), ForkProgress::new(0, last_blockhash));
|
||||
let shreds = shred_to_insert(&last_blockhash, bank0.slot());
|
||||
let shreds = shred_to_insert(&mint_keypair, &last_blockhash, bank0.slot());
|
||||
blocktree.insert_shreds(shreds, None).unwrap();
|
||||
let (res, _tx_count) =
|
||||
ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress);
|
||||
|
|
|
@ -20,7 +20,6 @@ use solana_sdk::transaction::Transaction;
|
|||
use std::mem::size_of;
|
||||
|
||||
use solana_rayon_threadlimit::get_thread_count;
|
||||
pub const NUM_THREADS: u32 = 10;
|
||||
use std::cell::RefCell;
|
||||
|
||||
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||
|
|
|
@ -21,7 +21,6 @@ use std::result;
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub const NUM_THREADS: u32 = 10;
|
||||
use solana_rayon_threadlimit::get_thread_count;
|
||||
use std::cell::RefCell;
|
||||
|
||||
|
@ -109,12 +108,11 @@ fn process_entries_with_callback(
|
|||
) -> Result<()> {
|
||||
// accumulator for entries that can be processed in parallel
|
||||
let mut batches = vec![];
|
||||
let mut tick_hashes = vec![];
|
||||
for entry in entries {
|
||||
if entry.is_tick() {
|
||||
// if its a tick, execute the group and register the tick
|
||||
execute_batches(bank, &batches, entry_callback)?;
|
||||
batches.clear();
|
||||
bank.register_tick(&entry.hash);
|
||||
// if its a tick, save it for later
|
||||
tick_hashes.push(entry.hash);
|
||||
continue;
|
||||
}
|
||||
// else loop on processing the entry
|
||||
|
@ -164,6 +162,9 @@ fn process_entries_with_callback(
|
|||
}
|
||||
}
|
||||
execute_batches(bank, &batches, entry_callback)?;
|
||||
for hash in tick_hashes {
|
||||
bank.register_tick(&hash);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ use log::*;
|
|||
use rayon::prelude::*;
|
||||
use rayon::ThreadPool;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_merkle_tree::MerkleTree;
|
||||
use solana_metrics::*;
|
||||
use solana_rayon_threadlimit::get_thread_count;
|
||||
|
@ -18,10 +19,9 @@ use std::cell::RefCell;
|
|||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Instant;
|
||||
|
||||
pub const NUM_THREADS: u32 = 10;
|
||||
|
||||
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(get_thread_count())
|
||||
.build()
|
||||
|
@ -162,15 +162,70 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction
|
|||
}
|
||||
}
|
||||
|
||||
pub struct EntryVerifyState {
|
||||
thread_h: Option<JoinHandle<u64>>,
|
||||
hashes: Option<Arc<Mutex<Vec<Hash>>>>,
|
||||
verified: bool,
|
||||
tx_hashes: Vec<Option<Hash>>,
|
||||
start_time_ms: u64,
|
||||
}
|
||||
|
||||
impl EntryVerifyState {
|
||||
pub fn finish_verify(&mut self, entries: &[Entry]) -> bool {
|
||||
if self.hashes.is_some() {
|
||||
let gpu_time_ms = self.thread_h.take().unwrap().join().unwrap();
|
||||
|
||||
let mut verify_check_time = Measure::start("verify_check");
|
||||
let hashes = self.hashes.take().expect("hashes.as_ref");
|
||||
let hashes = Arc::try_unwrap(hashes)
|
||||
.expect("unwrap Arc")
|
||||
.into_inner()
|
||||
.expect("into_inner");
|
||||
let res = PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
hashes
|
||||
.into_par_iter()
|
||||
.zip(&self.tx_hashes)
|
||||
.zip(entries)
|
||||
.all(|((hash, tx_hash), answer)| {
|
||||
if answer.num_hashes == 0 {
|
||||
hash == answer.hash
|
||||
} else {
|
||||
let mut poh = Poh::new(hash, None);
|
||||
if let Some(mixin) = tx_hash {
|
||||
poh.record(*mixin).unwrap().hash == answer.hash
|
||||
} else {
|
||||
poh.tick().unwrap().hash == answer.hash
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
});
|
||||
verify_check_time.stop();
|
||||
inc_new_counter_warn!(
|
||||
"entry_verify-duration",
|
||||
(gpu_time_ms + verify_check_time.as_ms() + self.start_time_ms) as usize
|
||||
);
|
||||
res
|
||||
} else {
|
||||
self.verified
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// an EntrySlice is a slice of Entries
|
||||
pub trait EntrySlice {
|
||||
/// Verifies the hashes and counts of a slice of transactions are all consistent.
|
||||
fn verify_cpu(&self, start_hash: &Hash) -> bool;
|
||||
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState;
|
||||
fn start_verify(&self, start_hash: &Hash) -> EntryVerifyState;
|
||||
fn verify(&self, start_hash: &Hash) -> bool;
|
||||
}
|
||||
|
||||
impl EntrySlice for [Entry] {
|
||||
fn verify_cpu(&self, start_hash: &Hash) -> bool {
|
||||
fn verify(&self, start_hash: &Hash) -> bool {
|
||||
self.start_verify(start_hash).finish_verify(self)
|
||||
}
|
||||
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState {
|
||||
let now = Instant::now();
|
||||
let genesis = [Entry {
|
||||
num_hashes: 0,
|
||||
|
@ -198,10 +253,16 @@ impl EntrySlice for [Entry] {
|
|||
"entry_verify-duration",
|
||||
timing::duration_as_ms(&now.elapsed()) as usize
|
||||
);
|
||||
res
|
||||
EntryVerifyState {
|
||||
thread_h: None,
|
||||
verified: res,
|
||||
hashes: None,
|
||||
tx_hashes: vec![],
|
||||
start_time_ms: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn verify(&self, start_hash: &Hash) -> bool {
|
||||
fn start_verify(&self, start_hash: &Hash) -> EntryVerifyState {
|
||||
let api = perf_libs::api();
|
||||
if api.is_none() {
|
||||
return self.verify_cpu(start_hash);
|
||||
|
@ -209,11 +270,6 @@ impl EntrySlice for [Entry] {
|
|||
let api = api.unwrap();
|
||||
inc_new_counter_warn!("entry_verify-num_entries", self.len() as usize);
|
||||
|
||||
// Use CPU verify if the batch length is < 1K
|
||||
if self.len() < 1024 {
|
||||
return self.verify_cpu(start_hash);
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
let genesis = [Entry {
|
||||
|
@ -238,9 +294,9 @@ impl EntrySlice for [Entry] {
|
|||
let hashes = Arc::new(Mutex::new(hashes));
|
||||
let hashes_clone = hashes.clone();
|
||||
|
||||
let gpu_wait = Instant::now();
|
||||
let gpu_verify_thread = thread::spawn(move || {
|
||||
let mut hashes = hashes_clone.lock().unwrap();
|
||||
let gpu_wait = Instant::now();
|
||||
let res;
|
||||
unsafe {
|
||||
res = (api.poh_verify_many)(
|
||||
|
@ -253,9 +309,14 @@ impl EntrySlice for [Entry] {
|
|||
if res != 0 {
|
||||
panic!("GPU PoH verify many failed");
|
||||
}
|
||||
inc_new_counter_warn!(
|
||||
"entry_verify-gpu_thread",
|
||||
timing::duration_as_ms(&gpu_wait.elapsed()) as usize
|
||||
);
|
||||
timing::duration_as_ms(&gpu_wait.elapsed())
|
||||
});
|
||||
|
||||
let tx_hashes: Vec<Option<Hash>> = PAR_THREAD_POOL.with(|thread_pool| {
|
||||
let tx_hashes = PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
self.into_par_iter()
|
||||
.map(|entry| {
|
||||
|
@ -269,37 +330,13 @@ impl EntrySlice for [Entry] {
|
|||
})
|
||||
});
|
||||
|
||||
gpu_verify_thread.join().unwrap();
|
||||
inc_new_counter_warn!(
|
||||
"entry_verify-gpu_thread",
|
||||
timing::duration_as_ms(&gpu_wait.elapsed()) as usize
|
||||
);
|
||||
|
||||
let hashes = Arc::try_unwrap(hashes).unwrap().into_inner().unwrap();
|
||||
let res =
|
||||
PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
hashes.into_par_iter().zip(tx_hashes).zip(self).all(
|
||||
|((hash, tx_hash), answer)| {
|
||||
if answer.num_hashes == 0 {
|
||||
hash == answer.hash
|
||||
} else {
|
||||
let mut poh = Poh::new(hash, None);
|
||||
if let Some(mixin) = tx_hash {
|
||||
poh.record(mixin).unwrap().hash == answer.hash
|
||||
} else {
|
||||
poh.tick().unwrap().hash == answer.hash
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
})
|
||||
});
|
||||
inc_new_counter_warn!(
|
||||
"entry_verify-duration",
|
||||
timing::duration_as_ms(&start.elapsed()) as usize
|
||||
);
|
||||
res
|
||||
EntryVerifyState {
|
||||
thread_h: Some(gpu_verify_thread),
|
||||
verified: false,
|
||||
tx_hashes,
|
||||
start_time_ms: timing::duration_as_ms(&start.elapsed()),
|
||||
hashes: Some(hashes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher>(
|
|||
system_transaction::transfer(&funding_keypair, &random_keypair.pubkey(), 1, blockhash);
|
||||
let confs = VOTE_THRESHOLD_DEPTH + 1;
|
||||
let sig = client
|
||||
.retry_transfer_until_confirmed(&funding_keypair, &mut transaction, 5, confs)
|
||||
.retry_transfer_until_confirmed(&funding_keypair, &mut transaction, 10, confs)
|
||||
.unwrap();
|
||||
for validator in &cluster_nodes {
|
||||
if ignore_nodes.contains(&validator.id) {
|
||||
|
|
|
@ -434,7 +434,7 @@ impl LocalCluster {
|
|||
*dest_pubkey
|
||||
);
|
||||
client
|
||||
.retry_transfer(&source_keypair, &mut tx, 5)
|
||||
.retry_transfer(&source_keypair, &mut tx, 10)
|
||||
.expect("client transfer");
|
||||
client
|
||||
.wait_for_balance(dest_pubkey, Some(lamports))
|
||||
|
@ -472,7 +472,7 @@ impl LocalCluster {
|
|||
client.get_recent_blockhash().unwrap().0,
|
||||
);
|
||||
client
|
||||
.retry_transfer(&from_account, &mut transaction, 5)
|
||||
.retry_transfer(&from_account, &mut transaction, 10)
|
||||
.expect("fund vote");
|
||||
client
|
||||
.wait_for_balance(&vote_account_pubkey, Some(amount))
|
||||
|
@ -571,7 +571,7 @@ impl LocalCluster {
|
|||
let blockhash = client.get_recent_blockhash().unwrap().0;
|
||||
let mut transaction = Transaction::new(&signer_keys, message, blockhash);
|
||||
client
|
||||
.retry_transfer(&from_keypair, &mut transaction, 5)
|
||||
.retry_transfer(&from_keypair, &mut transaction, 10)
|
||||
.map(|_signature| ())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -624,6 +624,7 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_repairman_catchup() {
|
||||
solana_logger::setup();
|
||||
error!("test_repairman_catchup");
|
||||
|
|
|
@ -2,11 +2,11 @@
|
|||
|
||||
// The default tick rate that the cluster attempts to achieve. Note that the actual tick
|
||||
// rate at any given time should be expected to drift
|
||||
pub const DEFAULT_TICKS_PER_SECOND: u64 = 10;
|
||||
pub const DEFAULT_TICKS_PER_SECOND: u64 = 160;
|
||||
|
||||
// At 10 ticks/s, 4 ticks per slot implies that leader rotation and voting will happen
|
||||
// At 160 ticks/s, 64 ticks per slot implies that leader rotation and voting will happen
|
||||
// every 400 ms. A fast voting cadence ensures faster finality and convergence
|
||||
pub const DEFAULT_TICKS_PER_SLOT: u64 = 4;
|
||||
pub const DEFAULT_TICKS_PER_SLOT: u64 = 64;
|
||||
|
||||
// 1 Epoch = 400 * 8192 ms ~= 55 minutes
|
||||
pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 8192;
|
||||
|
|
Loading…
Reference in New Issue