Optimize replay waking up (#24051)

* timeout for validator exits

* clippy

* print backtrace when panic

* add backtrace package

* increase time out to 30s

* debug logging

* make rpc complete service non blocking

* reduce log level

* remove logging

* recv_timeout

* remove backtrace

* remove sleep

* wip

* remove unused variable

* add comments

* Update core/src/validator.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/validator.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* whitespace

* more whitespace

* fix build

* clean up import

* add mutex for signal senders in blockstore

* remove mut

* refactor: extract add signal functions

* make blockstore signal private

* increase replay wake up channel bounds

* reduce replay wakeup signal bound to 1

* reduce log level

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
This commit is contained in:
HaoranYi 2022-04-05 08:57:12 -05:00 committed by GitHub
parent 4a11fa072f
commit eb65ffb779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 4 deletions

View File

@ -90,6 +90,7 @@ thread_local!(static PAR_THREAD_POOL_ALL_CPUS: RefCell<ThreadPool> = RefCell::ne
.build() .build()
.unwrap())); .unwrap()));
pub const MAX_REPLAY_WAKE_UP_SIGNALS: usize = 1;
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100; pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100;
pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK; pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK;
@ -464,7 +465,7 @@ impl Blockstore {
options: BlockstoreOptions, options: BlockstoreOptions,
) -> Result<BlockstoreSignals> { ) -> Result<BlockstoreSignals> {
let blockstore = Self::open_with_options(ledger_path, options)?; let blockstore = Self::open_with_options(ledger_path, options)?;
let (ledger_signal_sender, ledger_signal_receiver) = bounded(1); let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
let (completed_slots_sender, completed_slots_receiver) = let (completed_slots_sender, completed_slots_receiver) =
bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL); bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
@ -3427,7 +3428,15 @@ fn send_signals(
) { ) {
if should_signal { if should_signal {
for signal in new_shreds_signals { for signal in new_shreds_signals {
let _ = signal.try_send(true); match signal.try_send(true) {
Ok(_) => {}
Err(TrySendError::Full(_)) => {
trace!("replay wake up signal channel is full.")
}
Err(TrySendError::Disconnected(_)) => {
trace!("replay wake up signal channel is disconnected.")
}
}
} }
} }

View File

@ -13,7 +13,7 @@
pub use solana_sdk::clock::Slot; pub use solana_sdk::clock::Slot;
use { use {
crate::poh_service::PohService, crate::poh_service::PohService,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender}, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TrySendError},
log::*, log::*,
solana_entry::{entry::Entry, poh::Poh}, solana_entry::{entry::Entry, poh::Poh},
solana_ledger::{ solana_ledger::{
@ -262,7 +262,15 @@ impl PohRecorder {
} }
if let Some(ref signal) = self.clear_bank_signal { if let Some(ref signal) = self.clear_bank_signal {
let _ = signal.try_send(true); match signal.try_send(true) {
Ok(_) => {}
Err(TrySendError::Full(_)) => {
trace!("replay wake up signal channel is full.")
}
Err(TrySendError::Disconnected(_)) => {
trace!("replay wake up signal channel is disconnected.")
}
}
} }
} }