Blockstore should drop signals before validator exit (#24025)

* timeout for validator exits

* clippy

* print backtrace when panic

* add backtrace package

* increase time out to 30s

* debug logging

* make rpc complete service non blocking

* reduce log level

* remove logging

* recv_timeout

* remove backtrace

* remove sleep

* wip

* remove unused variable

* add comments

* Update core/src/validator.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/validator.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* whitespace

* more whitespace

* fix build

* clean up import

* add mutex for signal senders in blockstore

* remove mut

* refactor: extract add signal functions

* make blockstore signal private

* let compiler infer mutex type

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
This commit is contained in:
HaoranYi 2022-04-04 11:38:05 -05:00 committed by GitHub
parent f8f3edac3c
commit 6ba4e870c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 12 deletions

View File

@ -337,6 +337,7 @@ pub struct Validator {
ip_echo_server: Option<solana_net_utils::IpEchoServer>, ip_echo_server: Option<solana_net_utils::IpEchoServer>,
pub cluster_info: Arc<ClusterInfo>, pub cluster_info: Arc<ClusterInfo>,
pub bank_forks: Arc<RwLock<BankForks>>, pub bank_forks: Arc<RwLock<BankForks>>,
pub blockstore: Arc<Blockstore>,
accountsdb_repl_service: Option<AccountsDbReplService>, accountsdb_repl_service: Option<AccountsDbReplService>,
geyser_plugin_service: Option<GeyserPluginService>, geyser_plugin_service: Option<GeyserPluginService>,
} }
@ -656,7 +657,7 @@ impl Validator {
bank.ticks_per_slot(), bank.ticks_per_slot(),
&id, &id,
&blockstore, &blockstore,
blockstore.new_shreds_signals.first().cloned(), blockstore.get_new_shred_signal(0),
&leader_schedule_cache, &leader_schedule_cache,
&poh_config, &poh_config,
Some(poh_timing_point_sender), Some(poh_timing_point_sender),
@ -856,7 +857,7 @@ impl Validator {
record_receiver, record_receiver,
); );
assert_eq!( assert_eq!(
blockstore.new_shreds_signals.len(), blockstore.get_new_shred_signals_len(),
1, 1,
"New shred signal for the TVU should be the same as the clear bank signal." "New shred signal for the TVU should be the same as the clear bank signal."
); );
@ -994,6 +995,7 @@ impl Validator {
validator_exit: config.validator_exit.clone(), validator_exit: config.validator_exit.clone(),
cluster_info, cluster_info,
bank_forks, bank_forks,
blockstore: blockstore.clone(),
accountsdb_repl_service, accountsdb_repl_service,
geyser_plugin_service, geyser_plugin_service,
} }
@ -1002,6 +1004,9 @@ impl Validator {
// Used for notifying many nodes in parallel to exit // Used for notifying many nodes in parallel to exit
pub fn exit(&mut self) { pub fn exit(&mut self) {
self.validator_exit.write().unwrap().exit(); self.validator_exit.write().unwrap().exit();
// drop all signals in blockstore
self.blockstore.drop_signal();
} }
pub fn close(mut self) { pub fn close(mut self) {

View File

@ -174,8 +174,8 @@ pub struct Blockstore {
bank_hash_cf: LedgerColumn<cf::BankHash>, bank_hash_cf: LedgerColumn<cf::BankHash>,
last_root: RwLock<Slot>, last_root: RwLock<Slot>,
insert_shreds_lock: Mutex<()>, insert_shreds_lock: Mutex<()>,
pub new_shreds_signals: Vec<Sender<bool>>, new_shreds_signals: Mutex<Vec<Sender<bool>>>,
pub completed_slots_senders: Vec<CompletedSlotsSender>, completed_slots_senders: Mutex<Vec<CompletedSlotsSender>>,
pub shred_timing_point_sender: Option<PohTimingSender>, pub shred_timing_point_sender: Option<PohTimingSender>,
pub lowest_cleanup_slot: RwLock<Slot>, pub lowest_cleanup_slot: RwLock<Slot>,
no_compaction: bool, no_compaction: bool,
@ -444,8 +444,8 @@ impl Blockstore {
block_height_cf, block_height_cf,
program_costs_cf, program_costs_cf,
bank_hash_cf, bank_hash_cf,
new_shreds_signals: vec![], new_shreds_signals: Mutex::default(),
completed_slots_senders: vec![], completed_slots_senders: Mutex::default(),
shred_timing_point_sender: None, shred_timing_point_sender: None,
insert_shreds_lock: Mutex::<()>::default(), insert_shreds_lock: Mutex::<()>::default(),
last_root, last_root,
@ -463,13 +463,13 @@ impl Blockstore {
ledger_path: &Path, ledger_path: &Path,
options: BlockstoreOptions, options: BlockstoreOptions,
) -> Result<BlockstoreSignals> { ) -> Result<BlockstoreSignals> {
let mut blockstore = Self::open_with_options(ledger_path, options)?; let blockstore = Self::open_with_options(ledger_path, options)?;
let (ledger_signal_sender, ledger_signal_receiver) = bounded(1); let (ledger_signal_sender, ledger_signal_receiver) = bounded(1);
let (completed_slots_sender, completed_slots_receiver) = let (completed_slots_sender, completed_slots_receiver) =
bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL); bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL);
blockstore.new_shreds_signals = vec![ledger_signal_sender]; blockstore.add_new_shred_signal(ledger_signal_sender);
blockstore.completed_slots_senders = vec![completed_slots_sender]; blockstore.add_completed_slots_signal(completed_slots_sender);
Ok(BlockstoreSignals { Ok(BlockstoreSignals {
blockstore, blockstore,
@ -1027,7 +1027,7 @@ impl Blockstore {
let mut start = Measure::start("Commit Working Sets"); let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
&slot_meta_working_set, &slot_meta_working_set,
&self.completed_slots_senders, &self.completed_slots_senders.lock().unwrap(),
&mut write_batch, &mut write_batch,
)?; )?;
@ -1049,8 +1049,8 @@ impl Blockstore {
metrics.write_batch_elapsed += start.as_us(); metrics.write_batch_elapsed += start.as_us();
send_signals( send_signals(
&self.new_shreds_signals, &self.new_shreds_signals.lock().unwrap(),
&self.completed_slots_senders, &self.completed_slots_senders.lock().unwrap(),
should_signal, should_signal,
newly_completed_slots, newly_completed_slots,
); );
@ -1063,6 +1063,27 @@ impl Blockstore {
Ok((newly_completed_data_sets, inserted_indices)) Ok((newly_completed_data_sets, inserted_indices))
} }
pub fn add_new_shred_signal(&self, s: Sender<bool>) {
self.new_shreds_signals.lock().unwrap().push(s);
}
pub fn add_completed_slots_signal(&self, s: CompletedSlotsSender) {
self.completed_slots_senders.lock().unwrap().push(s);
}
pub fn get_new_shred_signals_len(&self) -> usize {
self.new_shreds_signals.lock().unwrap().len()
}
pub fn get_new_shred_signal(&self, index: usize) -> Option<Sender<bool>> {
self.new_shreds_signals.lock().unwrap().get(index).cloned()
}
pub fn drop_signal(&self) {
self.new_shreds_signals.lock().unwrap().clear();
self.completed_slots_senders.lock().unwrap().clear();
}
/// Range-delete all entries which prefix matches the specified `slot` and /// Range-delete all entries which prefix matches the specified `slot` and
/// clear all the related `SlotMeta` except its next_slots. /// clear all the related `SlotMeta` except its next_slots.
/// ///