From 6ba4e870c4ed565f7f284604f6e85d39bb35a582 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Mon, 4 Apr 2022 11:38:05 -0500 Subject: [PATCH] Blockstore should drop signals before validator exit (#24025) * timeout for validator exits * clippy * print backtrace when panic * add backtrace package * increase time out to 30s * debug logging * make rpc complete service non blocking * reduce log level * remove logging * recv_timeout * remove backtrace * remove sleep * wip * remove unused variable * add comments * Update core/src/validator.rs Co-authored-by: Trent Nelson * Update core/src/validator.rs Co-authored-by: Trent Nelson * whitespace * more whitespace * fix build * clean up import * add mutex for signal senders in blockstore * remove mut * refactor: extract add signal functions * make blockstore signal private * let compiler infer mutex type Co-authored-by: Trent Nelson --- core/src/validator.rs | 9 +++++++-- ledger/src/blockstore.rs | 41 ++++++++++++++++++++++++++++++---------- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 4673cc9475..a0128f68a0 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -337,6 +337,7 @@ pub struct Validator { ip_echo_server: Option, pub cluster_info: Arc, pub bank_forks: Arc>, + pub blockstore: Arc, accountsdb_repl_service: Option, geyser_plugin_service: Option, } @@ -656,7 +657,7 @@ impl Validator { bank.ticks_per_slot(), &id, &blockstore, - blockstore.new_shreds_signals.first().cloned(), + blockstore.get_new_shred_signal(0), &leader_schedule_cache, &poh_config, Some(poh_timing_point_sender), @@ -856,7 +857,7 @@ impl Validator { record_receiver, ); assert_eq!( - blockstore.new_shreds_signals.len(), + blockstore.get_new_shred_signals_len(), 1, "New shred signal for the TVU should be the same as the clear bank signal." ); @@ -994,6 +995,7 @@ impl Validator { validator_exit: config.validator_exit.clone(), cluster_info, bank_forks, + blockstore: blockstore.clone(), accountsdb_repl_service, geyser_plugin_service, } @@ -1002,6 +1004,9 @@ impl Validator { // Used for notifying many nodes in parallel to exit pub fn exit(&mut self) { self.validator_exit.write().unwrap().exit(); + + // drop all signals in blockstore + self.blockstore.drop_signal(); } pub fn close(mut self) { diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 30ad490bb0..19a521e669 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -174,8 +174,8 @@ pub struct Blockstore { bank_hash_cf: LedgerColumn, last_root: RwLock, insert_shreds_lock: Mutex<()>, - pub new_shreds_signals: Vec>, - pub completed_slots_senders: Vec, + new_shreds_signals: Mutex>>, + completed_slots_senders: Mutex>, pub shred_timing_point_sender: Option, pub lowest_cleanup_slot: RwLock, no_compaction: bool, @@ -444,8 +444,8 @@ impl Blockstore { block_height_cf, program_costs_cf, bank_hash_cf, - new_shreds_signals: vec![], - completed_slots_senders: vec![], + new_shreds_signals: Mutex::default(), + completed_slots_senders: Mutex::default(), shred_timing_point_sender: None, insert_shreds_lock: Mutex::<()>::default(), last_root, @@ -463,13 +463,13 @@ impl Blockstore { ledger_path: &Path, options: BlockstoreOptions, ) -> Result { - let mut blockstore = Self::open_with_options(ledger_path, options)?; + let blockstore = Self::open_with_options(ledger_path, options)?; let (ledger_signal_sender, ledger_signal_receiver) = bounded(1); let (completed_slots_sender, completed_slots_receiver) = bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL); - blockstore.new_shreds_signals = vec![ledger_signal_sender]; - blockstore.completed_slots_senders = vec![completed_slots_sender]; + blockstore.add_new_shred_signal(ledger_signal_sender); + blockstore.add_completed_slots_signal(completed_slots_sender); Ok(BlockstoreSignals { blockstore, @@ -1027,7 +1027,7 @@ impl Blockstore { let mut start = Measure::start("Commit Working Sets"); let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( &slot_meta_working_set, - &self.completed_slots_senders, + &self.completed_slots_senders.lock().unwrap(), &mut write_batch, )?; @@ -1049,8 +1049,8 @@ impl Blockstore { metrics.write_batch_elapsed += start.as_us(); send_signals( - &self.new_shreds_signals, - &self.completed_slots_senders, + &self.new_shreds_signals.lock().unwrap(), + &self.completed_slots_senders.lock().unwrap(), should_signal, newly_completed_slots, ); @@ -1063,6 +1063,27 @@ impl Blockstore { Ok((newly_completed_data_sets, inserted_indices)) } + pub fn add_new_shred_signal(&self, s: Sender) { + self.new_shreds_signals.lock().unwrap().push(s); + } + + pub fn add_completed_slots_signal(&self, s: CompletedSlotsSender) { + self.completed_slots_senders.lock().unwrap().push(s); + } + + pub fn get_new_shred_signals_len(&self) -> usize { + self.new_shreds_signals.lock().unwrap().len() + } + + pub fn get_new_shred_signal(&self, index: usize) -> Option> { + self.new_shreds_signals.lock().unwrap().get(index).cloned() + } + + pub fn drop_signal(&self) { + self.new_shreds_signals.lock().unwrap().clear(); + self.completed_slots_senders.lock().unwrap().clear(); + } + /// Range-delete all entries which prefix matches the specified `slot` and /// clear all the related `SlotMeta` except its next_slots. ///