From 76e20015a46f42bbf2cbaba2aea2bd7d3eda17a8 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 16 Jan 2020 15:27:54 -0800 Subject: [PATCH] Add separate thread to check for and store duplicate slot proofs (#7834) --- core/src/window_service.rs | 124 +++++++++++++++++++++++++++++++++---- ledger/src/blockstore.rs | 27 +++++--- 2 files changed, 132 insertions(+), 19 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 7d563c874..18fef0c98 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -73,11 +73,44 @@ pub fn should_retransmit_and_persist( } } -fn run_insert( +fn run_check_duplicate( + blockstore: &Arc, + shred_receiver: &CrossbeamReceiver, +) -> Result<()> { + let check_duplicate = |shred: Shred| -> Result<()> { + if !blockstore.has_duplicate_shreds_in_slot(shred.slot()) { + if let Some(existing_shred_payload) = + blockstore.is_shred_duplicate(shred.slot(), shred.index(), &shred.payload) + { + blockstore.store_duplicate_slot( + shred.slot(), + existing_shred_payload, + shred.payload, + )?; + } + } + + Ok(()) + }; + let timer = Duration::from_millis(200); + let shred = shred_receiver.recv_timeout(timer)?; + check_duplicate(shred)?; + while let Ok(shred) = shred_receiver.try_recv() { + check_duplicate(shred)?; + } + + Ok(()) +} + +fn run_insert( shred_receiver: &CrossbeamReceiver>, blockstore: &Arc, leader_schedule_cache: &Arc, -) -> Result<()> { + handle_duplicate: F, +) -> Result<()> +where + F: Fn(Shred) -> (), +{ let timer = Duration::from_millis(200); let mut shreds = shred_receiver.recv_timeout(timer)?; @@ -85,8 +118,12 @@ fn run_insert( shreds.append(&mut more_shreds) } - let blockstore_insert_metrics = - blockstore.insert_shreds(shreds, Some(leader_schedule_cache), false)?; + let blockstore_insert_metrics = blockstore.insert_shreds_handle_duplicate( + shreds, + Some(leader_schedule_cache), + false, + &handle_duplicate, + )?; blockstore_insert_metrics.report_metrics("recv-window-insert-shreds"); Ok(()) @@ -199,6 +236,7 @@ impl Drop for Finalizer { pub struct WindowService { t_window: JoinHandle<()>, t_insert: JoinHandle<()>, + t_check_duplicate: JoinHandle<()>, repair_service: RepairService, } @@ -235,12 +273,17 @@ impl WindowService { ); let (insert_sender, insert_receiver) = unbounded(); + let (duplicate_sender, duplicate_receiver) = unbounded(); + + let t_check_duplicate = + Self::start_check_duplicate_thread(exit, &blockstore, duplicate_receiver); let t_insert = Self::start_window_insert_thread( exit, &blockstore, leader_schedule_cache, insert_receiver, + duplicate_sender, ); let t_window = Self::start_recv_window_thread( @@ -257,15 +300,44 @@ impl WindowService { WindowService { t_window, t_insert, + t_check_duplicate, repair_service, } } + fn start_check_duplicate_thread( + exit: &Arc, + blockstore: &Arc, + duplicate_receiver: CrossbeamReceiver, + ) -> JoinHandle<()> { + let exit = exit.clone(); + let blockstore = blockstore.clone(); + let handle_error = || { + inc_new_counter_error!("solana-check-duplicate-error", 1, 1); + }; + Builder::new() + .name("solana-check-duplicate".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let mut noop = || {}; + if let Err(e) = run_check_duplicate(&blockstore, &duplicate_receiver) { + if Self::should_exit_on_error(e, &mut noop, &handle_error) { + break; + } + } + }) + .unwrap() + } + fn start_window_insert_thread( exit: &Arc, blockstore: &Arc, leader_schedule_cache: &Arc, insert_receiver: CrossbeamReceiver>, + duplicate_sender: CrossbeamSender, ) -> JoinHandle<()> { let exit = exit.clone(); let blockstore = blockstore.clone(); @@ -274,17 +346,28 @@ impl WindowService { let handle_error = || { inc_new_counter_error!("solana-window-insert-error", 1, 1); }; + Builder::new() .name("solana-window-insert".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - - if let Err(e) = run_insert(&insert_receiver, &blockstore, &leader_schedule_cache) { - if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { + .spawn(move || { + let handle_duplicate = |shred| { + let _ = duplicate_sender.send(shred); + }; + loop { + if exit.load(Ordering::Relaxed) { break; } + + if let Err(e) = run_insert( + &insert_receiver, + &blockstore, + &leader_schedule_cache, + &handle_duplicate, + ) { + if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { + break; + } + } } }) .unwrap() @@ -384,6 +467,7 @@ impl WindowService { pub fn join(self) -> thread::Result<()> { self.t_window.join()?; self.t_insert.join()?; + self.t_check_duplicate.join()?; self.repair_service.join() } } @@ -587,4 +671,22 @@ mod test { exit.store(true, Ordering::Relaxed); window.join().unwrap(); } + + #[test] + fn test_run_check_duplicate() { + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); + let (sender, receiver) = unbounded(); + let (shreds, _) = make_many_slot_entries(5, 5, 10); + blockstore + .insert_shreds(shreds.clone(), None, false) + .unwrap(); + let mut duplicate_shred = shreds[1].clone(); + duplicate_shred.set_slot(shreds[0].slot()); + let duplicate_shred_slot = duplicate_shred.slot(); + sender.send(duplicate_shred).unwrap(); + assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); + run_check_duplicate(&blockstore, &receiver).unwrap(); + assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); + } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c4298f1e6..1111f16cc 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1668,16 +1668,22 @@ impl Blockstore { } // `new_shred` is asssumed to have slot and index equal to the given slot and index. - // Returns true if `new_shred` is not equal to the existing shred at the given - // slot and index as this implies the leader generated two different shreds with + // Returns the existing shred if `new_shred` is not equal to the existing shred at the + // given slot and index as this implies the leader generated two different shreds with // the same slot and index - pub fn is_shred_duplicate(&self, slot: u64, index: u64, new_shred: &[u8]) -> bool { + pub fn is_shred_duplicate(&self, slot: u64, index: u32, new_shred: &[u8]) -> Option> { let res = self - .get_data_shred(slot, index) + .get_data_shred(slot, index as u64) .expect("fetch from DuplicateSlots column family failed"); - res.map(|existing_shred| existing_shred != new_shred) - .unwrap_or(false) + res.map(|existing_shred| { + if existing_shred != new_shred { + Some(existing_shred) + } else { + None + } + }) + .unwrap_or(None) } pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool { @@ -5286,8 +5292,13 @@ pub mod tests { assert!(!blockstore.has_duplicate_shreds_in_slot(slot)); // Check if shreds are duplicated - assert!(blockstore.is_shred_duplicate(slot, 0, &duplicate_shred.payload)); - assert!(!blockstore.is_shred_duplicate(slot, 0, &non_duplicate_shred.payload)); + assert_eq!( + blockstore.is_shred_duplicate(slot, 0, &duplicate_shred.payload), + Some(shred.payload.clone()) + ); + assert!(blockstore + .is_shred_duplicate(slot, 0, &non_duplicate_shred.payload) + .is_none()); // Store a duplicate shred blockstore