Use pinned memory for entry verify (#7440)
This commit is contained in:
parent
c4f3bb9b67
commit
dd54fff978
|
@ -14,7 +14,7 @@ use solana_ledger::{
|
||||||
block_error::BlockError,
|
block_error::BlockError,
|
||||||
blocktree::{Blocktree, BlocktreeError},
|
blocktree::{Blocktree, BlocktreeError},
|
||||||
blocktree_processor::{self, TransactionStatusSender},
|
blocktree_processor::{self, TransactionStatusSender},
|
||||||
entry::{Entry, EntrySlice},
|
entry::{Entry, EntrySlice, VerifyRecyclers},
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
snapshot_package::SnapshotPackageSender,
|
snapshot_package::SnapshotPackageSender,
|
||||||
};
|
};
|
||||||
|
@ -210,6 +210,7 @@ impl ReplayStage {
|
||||||
let t_replay = Builder::new()
|
let t_replay = Builder::new()
|
||||||
.name("solana-replay-stage".to_string())
|
.name("solana-replay-stage".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
let verify_recyclers = VerifyRecyclers::default();
|
||||||
let _exit = Finalizer::new(exit.clone());
|
let _exit = Finalizer::new(exit.clone());
|
||||||
let mut progress = HashMap::new();
|
let mut progress = HashMap::new();
|
||||||
// Initialize progress map with any root banks
|
// Initialize progress map with any root banks
|
||||||
|
@ -258,6 +259,7 @@ impl ReplayStage {
|
||||||
&mut progress,
|
&mut progress,
|
||||||
&slot_full_senders,
|
&slot_full_senders,
|
||||||
transaction_status_sender.clone(),
|
transaction_status_sender.clone(),
|
||||||
|
&verify_recyclers,
|
||||||
);
|
);
|
||||||
datapoint_debug!(
|
datapoint_debug!(
|
||||||
"replay_stage-memory",
|
"replay_stage-memory",
|
||||||
|
@ -541,6 +543,7 @@ impl ReplayStage {
|
||||||
blocktree: &Blocktree,
|
blocktree: &Blocktree,
|
||||||
bank_progress: &mut ForkProgress,
|
bank_progress: &mut ForkProgress,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
|
verify_recyclers: &VerifyRecyclers,
|
||||||
) -> (Result<()>, usize) {
|
) -> (Result<()>, usize) {
|
||||||
let mut tx_count = 0;
|
let mut tx_count = 0;
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
@ -569,6 +572,7 @@ impl ReplayStage {
|
||||||
num_shreds,
|
num_shreds,
|
||||||
slot_full,
|
slot_full,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
|
verify_recyclers,
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -719,6 +723,7 @@ impl ReplayStage {
|
||||||
progress: &mut HashMap<u64, ForkProgress>,
|
progress: &mut HashMap<u64, ForkProgress>,
|
||||||
slot_full_senders: &[Sender<(u64, Pubkey)>],
|
slot_full_senders: &[Sender<(u64, Pubkey)>],
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
|
verify_recyclers: &VerifyRecyclers,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let mut did_complete_bank = false;
|
let mut did_complete_bank = false;
|
||||||
let mut tx_count = 0;
|
let mut tx_count = 0;
|
||||||
|
@ -746,6 +751,7 @@ impl ReplayStage {
|
||||||
&blocktree,
|
&blocktree,
|
||||||
bank_progress,
|
bank_progress,
|
||||||
transaction_status_sender.clone(),
|
transaction_status_sender.clone(),
|
||||||
|
verify_recyclers,
|
||||||
);
|
);
|
||||||
tx_count += replay_tx_count;
|
tx_count += replay_tx_count;
|
||||||
if Self::is_replay_result_fatal(&replay_result) {
|
if Self::is_replay_result_fatal(&replay_result) {
|
||||||
|
@ -960,6 +966,7 @@ impl ReplayStage {
|
||||||
num_shreds: usize,
|
num_shreds: usize,
|
||||||
slot_full: bool,
|
slot_full: bool,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
|
verify_recyclers: &VerifyRecyclers,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let result = Self::verify_and_process_entries(
|
let result = Self::verify_and_process_entries(
|
||||||
&bank,
|
&bank,
|
||||||
|
@ -968,6 +975,7 @@ impl ReplayStage {
|
||||||
bank_progress.num_shreds,
|
bank_progress.num_shreds,
|
||||||
bank_progress,
|
bank_progress,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
|
verify_recyclers,
|
||||||
);
|
);
|
||||||
bank_progress.num_shreds += num_shreds;
|
bank_progress.num_shreds += num_shreds;
|
||||||
bank_progress.num_entries += entries.len();
|
bank_progress.num_entries += entries.len();
|
||||||
|
@ -1020,6 +1028,7 @@ impl ReplayStage {
|
||||||
shred_index: usize,
|
shred_index: usize,
|
||||||
bank_progress: &mut ForkProgress,
|
bank_progress: &mut ForkProgress,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
|
recyclers: &VerifyRecyclers,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let last_entry = &bank_progress.last_entry;
|
let last_entry = &bank_progress.last_entry;
|
||||||
let tick_hash_count = &mut bank_progress.tick_hash_count;
|
let tick_hash_count = &mut bank_progress.tick_hash_count;
|
||||||
|
@ -1051,7 +1060,7 @@ impl ReplayStage {
|
||||||
|
|
||||||
datapoint_debug!("verify-batch-size", ("size", entries.len() as i64, i64));
|
datapoint_debug!("verify-batch-size", ("size", entries.len() as i64, i64));
|
||||||
let mut verify_total = Measure::start("verify_and_process_entries");
|
let mut verify_total = Measure::start("verify_and_process_entries");
|
||||||
let mut entry_state = entries.start_verify(last_entry);
|
let mut entry_state = entries.start_verify(last_entry, recyclers.clone());
|
||||||
|
|
||||||
let mut replay_elapsed = Measure::start("replay_elapsed");
|
let mut replay_elapsed = Measure::start("replay_elapsed");
|
||||||
let res =
|
let res =
|
||||||
|
@ -1749,6 +1758,7 @@ pub(crate) mod tests {
|
||||||
&blocktree,
|
&blocktree,
|
||||||
&mut bank0_progress,
|
&mut bank0_progress,
|
||||||
None,
|
None,
|
||||||
|
&VerifyRecyclers::default(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Check that the erroring bank was marked as dead in the progress map
|
// Check that the erroring bank was marked as dead in the progress map
|
||||||
|
|
|
@ -10,12 +10,15 @@ use serde::{Deserialize, Serialize};
|
||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_merkle_tree::MerkleTree;
|
use solana_merkle_tree::MerkleTree;
|
||||||
use solana_metrics::*;
|
use solana_metrics::*;
|
||||||
|
use solana_perf::cuda_runtime::PinnedVec;
|
||||||
use solana_perf::perf_libs;
|
use solana_perf::perf_libs;
|
||||||
|
use solana_perf::recycler::Recycler;
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
use solana_rayon_threadlimit::get_thread_count;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::timing;
|
use solana_sdk::timing;
|
||||||
use solana_sdk::transaction::Transaction;
|
use solana_sdk::transaction::Transaction;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
use std::cmp;
|
||||||
use std::sync::mpsc::{Receiver, Sender};
|
use std::sync::mpsc::{Receiver, Sender};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
@ -150,12 +153,18 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction
|
||||||
|
|
||||||
pub struct EntryVerifyState {
|
pub struct EntryVerifyState {
|
||||||
thread_h: Option<JoinHandle<u64>>,
|
thread_h: Option<JoinHandle<u64>>,
|
||||||
hashes: Option<Arc<Mutex<Vec<Hash>>>>,
|
hashes: Option<Arc<Mutex<PinnedVec<Hash>>>>,
|
||||||
verified: bool,
|
verified: bool,
|
||||||
tx_hashes: Vec<Option<Hash>>,
|
tx_hashes: Vec<Option<Hash>>,
|
||||||
start_time_ms: u64,
|
start_time_ms: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Clone)]
|
||||||
|
pub struct VerifyRecyclers {
|
||||||
|
hash_recycler: Recycler<PinnedVec<Hash>>,
|
||||||
|
tick_count_recycler: Recycler<PinnedVec<u64>>,
|
||||||
|
}
|
||||||
|
|
||||||
impl EntryVerifyState {
|
impl EntryVerifyState {
|
||||||
pub fn finish_verify(&mut self, entries: &[Entry]) -> bool {
|
pub fn finish_verify(&mut self, entries: &[Entry]) -> bool {
|
||||||
if self.hashes.is_some() {
|
if self.hashes.is_some() {
|
||||||
|
@ -175,9 +184,9 @@ impl EntryVerifyState {
|
||||||
.zip(entries)
|
.zip(entries)
|
||||||
.all(|((hash, tx_hash), answer)| {
|
.all(|((hash, tx_hash), answer)| {
|
||||||
if answer.num_hashes == 0 {
|
if answer.num_hashes == 0 {
|
||||||
hash == answer.hash
|
*hash == answer.hash
|
||||||
} else {
|
} else {
|
||||||
let mut poh = Poh::new(hash, None);
|
let mut poh = Poh::new(*hash, None);
|
||||||
if let Some(mixin) = tx_hash {
|
if let Some(mixin) = tx_hash {
|
||||||
poh.record(*mixin).unwrap().hash == answer.hash
|
poh.record(*mixin).unwrap().hash == answer.hash
|
||||||
} else {
|
} else {
|
||||||
|
@ -187,6 +196,7 @@ impl EntryVerifyState {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
verify_check_time.stop();
|
verify_check_time.stop();
|
||||||
inc_new_counter_warn!(
|
inc_new_counter_warn!(
|
||||||
"entry_verify-duration",
|
"entry_verify-duration",
|
||||||
|
@ -203,7 +213,7 @@ impl EntryVerifyState {
|
||||||
pub trait EntrySlice {
|
pub trait EntrySlice {
|
||||||
/// Verifies the hashes and counts of a slice of transactions are all consistent.
|
/// Verifies the hashes and counts of a slice of transactions are all consistent.
|
||||||
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState;
|
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState;
|
||||||
fn start_verify(&self, start_hash: &Hash) -> EntryVerifyState;
|
fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) -> EntryVerifyState;
|
||||||
fn verify(&self, start_hash: &Hash) -> bool;
|
fn verify(&self, start_hash: &Hash) -> bool;
|
||||||
/// Checks that each entry tick has the correct number of hashes. Entry slices do not
|
/// Checks that each entry tick has the correct number of hashes. Entry slices do not
|
||||||
/// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count
|
/// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count
|
||||||
|
@ -215,7 +225,8 @@ pub trait EntrySlice {
|
||||||
|
|
||||||
impl EntrySlice for [Entry] {
|
impl EntrySlice for [Entry] {
|
||||||
fn verify(&self, start_hash: &Hash) -> bool {
|
fn verify(&self, start_hash: &Hash) -> bool {
|
||||||
self.start_verify(start_hash).finish_verify(self)
|
self.start_verify(start_hash, VerifyRecyclers::default())
|
||||||
|
.finish_verify(self)
|
||||||
}
|
}
|
||||||
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState {
|
fn verify_cpu(&self, start_hash: &Hash) -> EntryVerifyState {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
@ -254,7 +265,7 @@ impl EntrySlice for [Entry] {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_verify(&self, start_hash: &Hash) -> EntryVerifyState {
|
fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) -> EntryVerifyState {
|
||||||
let api = perf_libs::api();
|
let api = perf_libs::api();
|
||||||
if api.is_none() {
|
if api.is_none() {
|
||||||
return self.verify_cpu(start_hash);
|
return self.verify_cpu(start_hash);
|
||||||
|
@ -277,13 +288,21 @@ impl EntrySlice for [Entry] {
|
||||||
.take(self.len())
|
.take(self.len())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let num_hashes_vec: Vec<u64> = self
|
let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash");
|
||||||
.iter()
|
hashes_pinned.set_pinnable();
|
||||||
.map(|entry| entry.num_hashes.saturating_sub(1))
|
hashes_pinned.resize(hashes.len(), Hash::default());
|
||||||
.collect();
|
hashes_pinned.copy_from_slice(&hashes);
|
||||||
|
|
||||||
|
let mut num_hashes_vec = recyclers
|
||||||
|
.tick_count_recycler
|
||||||
|
.allocate("poh_verify_num_hashes");
|
||||||
|
num_hashes_vec.reserve_and_pin(cmp::max(1, self.len()));
|
||||||
|
for entry in self {
|
||||||
|
num_hashes_vec.push(entry.num_hashes.saturating_sub(1));
|
||||||
|
}
|
||||||
|
|
||||||
let length = self.len();
|
let length = self.len();
|
||||||
let hashes = Arc::new(Mutex::new(hashes));
|
let hashes = Arc::new(Mutex::new(hashes_pinned));
|
||||||
let hashes_clone = hashes.clone();
|
let hashes_clone = hashes.clone();
|
||||||
|
|
||||||
let gpu_verify_thread = thread::spawn(move || {
|
let gpu_verify_thread = thread::spawn(move || {
|
||||||
|
|
|
@ -181,6 +181,13 @@ impl<T: Clone + Default + Sized> PinnedVec<T> {
|
||||||
self.pinnable = true;
|
self.pinnable = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn copy_from_slice(&mut self, data: &[T])
|
||||||
|
where
|
||||||
|
T: Copy,
|
||||||
|
{
|
||||||
|
self.x.copy_from_slice(data);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn from_vec(source: Vec<T>) -> Self {
|
pub fn from_vec(source: Vec<T>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
x: source,
|
x: source,
|
||||||
|
|
Loading…
Reference in New Issue