diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 7cff27767..c63cb95df 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -14,7 +14,7 @@ use solana_ledger::{ block_error::BlockError, blocktree::{Blocktree, BlocktreeError}, blocktree_processor::{self, TransactionStatusSender}, - entry::{Entry, EntrySlice}, + entry::{Entry, EntrySlice, VerifyRecyclers}, leader_schedule_cache::LeaderScheduleCache, snapshot_package::SnapshotPackageSender, }; @@ -210,6 +210,7 @@ impl ReplayStage { let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { + let verify_recyclers = VerifyRecyclers::default(); let _exit = Finalizer::new(exit.clone()); let mut progress = HashMap::new(); // Initialize progress map with any root banks @@ -258,6 +259,7 @@ impl ReplayStage { &mut progress, &slot_full_senders, transaction_status_sender.clone(), + &verify_recyclers, ); datapoint_debug!( "replay_stage-memory", @@ -541,6 +543,7 @@ impl ReplayStage { blocktree: &Blocktree, bank_progress: &mut ForkProgress, transaction_status_sender: Option, + verify_recyclers: &VerifyRecyclers, ) -> (Result<()>, usize) { let mut tx_count = 0; let now = Instant::now(); @@ -569,6 +572,7 @@ impl ReplayStage { num_shreds, slot_full, transaction_status_sender, + verify_recyclers, ) }); @@ -719,6 +723,7 @@ impl ReplayStage { progress: &mut HashMap, slot_full_senders: &[Sender<(u64, Pubkey)>], transaction_status_sender: Option, + verify_recyclers: &VerifyRecyclers, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -746,6 +751,7 @@ impl ReplayStage { &blocktree, bank_progress, transaction_status_sender.clone(), + verify_recyclers, ); tx_count += replay_tx_count; if Self::is_replay_result_fatal(&replay_result) { @@ -960,6 +966,7 @@ impl ReplayStage { num_shreds: usize, slot_full: bool, transaction_status_sender: Option, + verify_recyclers: &VerifyRecyclers, ) -> Result<()> { let result = Self::verify_and_process_entries( &bank, @@ -968,6 +975,7 @@ impl ReplayStage { bank_progress.num_shreds, bank_progress, transaction_status_sender, + verify_recyclers, ); bank_progress.num_shreds += num_shreds; bank_progress.num_entries += entries.len(); @@ -1020,6 +1028,7 @@ impl ReplayStage { shred_index: usize, bank_progress: &mut ForkProgress, transaction_status_sender: Option, + recyclers: &VerifyRecyclers, ) -> Result<()> { let last_entry = &bank_progress.last_entry; let tick_hash_count = &mut bank_progress.tick_hash_count; @@ -1051,7 +1060,7 @@ impl ReplayStage { datapoint_debug!("verify-batch-size", ("size", entries.len() as i64, i64)); let mut verify_total = Measure::start("verify_and_process_entries"); - let mut entry_state = entries.start_verify(last_entry); + let mut entry_state = entries.start_verify(last_entry, recyclers.clone()); let mut replay_elapsed = Measure::start("replay_elapsed"); let res = @@ -1749,6 +1758,7 @@ pub(crate) mod tests { &blocktree, &mut bank0_progress, None, + &VerifyRecyclers::default(), ); // Check that the erroring bank was marked as dead in the progress map diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 039f69369..86c3cf4c2 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -10,12 +10,15 @@ use serde::{Deserialize, Serialize}; use solana_measure::measure::Measure; use solana_merkle_tree::MerkleTree; use solana_metrics::*; +use solana_perf::cuda_runtime::PinnedVec; use solana_perf::perf_libs; +use solana_perf::recycler::Recycler; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::hash::Hash; use solana_sdk::timing; use solana_sdk::transaction::Transaction; use std::cell::RefCell; +use std::cmp; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; @@ -150,12 +153,18 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction pub struct EntryVerifyState { thread_h: Option>, - hashes: Option>>>, + hashes: Option>>>, verified: bool, tx_hashes: Vec>, start_time_ms: u64, } +#[derive(Default, Clone)] +pub struct VerifyRecyclers { + hash_recycler: Recycler>, + tick_count_recycler: Recycler>, +} + impl EntryVerifyState { pub fn finish_verify(&mut self, entries: &[Entry]) -> bool { if self.hashes.is_some() { @@ -175,9 +184,9 @@ impl EntryVerifyState { .zip(entries) .all(|((hash, tx_hash), answer)| { if answer.num_hashes == 0 { - hash == answer.hash + *hash == answer.hash } else { - let mut poh = Poh::new(hash, None); + let mut poh = Poh::new(*hash, None); if let Some(mixin) = tx_hash { poh.record(*mixin).unwrap().hash == answer.hash } else { @@ -187,6 +196,7 @@ impl EntryVerifyState { }) }) }); + verify_check_time.stop(); inc_new_counter_warn!( "entry_verify-duration", @@ -203,7 +213,7 @@ impl EntryVerifyState { pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState; - fn start_verify(&self, start_hash: &Hash) -> EntryVerifyState; + fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) -> EntryVerifyState; fn verify(&self, start_hash: &Hash) -> bool; /// Checks that each entry tick has the correct number of hashes. Entry slices do not /// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count @@ -215,7 +225,8 @@ pub trait EntrySlice { impl EntrySlice for [Entry] { fn verify(&self, start_hash: &Hash) -> bool { - self.start_verify(start_hash).finish_verify(self) + self.start_verify(start_hash, VerifyRecyclers::default()) + .finish_verify(self) } fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState { let now = Instant::now(); @@ -254,7 +265,7 @@ impl EntrySlice for [Entry] { } } - fn start_verify(&self, start_hash: &Hash) -> EntryVerifyState { + fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) -> EntryVerifyState { let api = perf_libs::api(); if api.is_none() { return self.verify_cpu(start_hash); @@ -277,13 +288,21 @@ impl EntrySlice for [Entry] { .take(self.len()) .collect(); - let num_hashes_vec: Vec = self - .iter() - .map(|entry| entry.num_hashes.saturating_sub(1)) - .collect(); + let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash"); + hashes_pinned.set_pinnable(); + hashes_pinned.resize(hashes.len(), Hash::default()); + hashes_pinned.copy_from_slice(&hashes); + + let mut num_hashes_vec = recyclers + .tick_count_recycler + .allocate("poh_verify_num_hashes"); + num_hashes_vec.reserve_and_pin(cmp::max(1, self.len())); + for entry in self { + num_hashes_vec.push(entry.num_hashes.saturating_sub(1)); + } let length = self.len(); - let hashes = Arc::new(Mutex::new(hashes)); + let hashes = Arc::new(Mutex::new(hashes_pinned)); let hashes_clone = hashes.clone(); let gpu_verify_thread = thread::spawn(move || { diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index fda5277e2..ac094e4f7 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -181,6 +181,13 @@ impl PinnedVec { self.pinnable = true; } + pub fn copy_from_slice(&mut self, data: &[T]) + where + T: Copy, + { + self.x.copy_from_slice(data); + } + pub fn from_vec(source: Vec) -> Self { Self { x: source,