diff --git a/core/src/window_service.rs b/core/src/window_service.rs index f9d6bb14c2..1eb0b200e4 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -6,10 +6,13 @@ use crate::packet::Packets; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::streamer::PacketSender; -use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; +use crossbeam_channel::{ + unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, +}; use rayon::iter::IntoParallelRefMutIterator; use rayon::iter::ParallelIterator; use rayon::ThreadPool; +use solana_ledger::bank_forks::BankForks; use solana_ledger::blocktree::{self, Blocktree}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::shred::Shred; @@ -67,14 +70,33 @@ pub fn should_retransmit_and_persist( } } +fn run_insert( + shred_receiver: &CrossbeamReceiver>, + blocktree: &Arc, + leader_schedule_cache: &Arc, +) -> Result<()> { + let timer = Duration::from_millis(200); + let mut shreds = shred_receiver.recv_timeout(timer)?; + + while let Ok(mut more_shreds) = shred_receiver.try_recv() { + shreds.append(&mut more_shreds) + } + + let blocktree_insert_metrics = + blocktree.insert_shreds(shreds, Some(leader_schedule_cache), false)?; + blocktree_insert_metrics.report_metrics("recv-window-insert-shreds"); + + Ok(()) +} + fn recv_window( blocktree: &Arc, + insert_shred_sender: &CrossbeamSender>, my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, retransmit: &PacketSender, shred_filter: F, thread_pool: &ThreadPool, - leader_schedule_cache: &Arc, ) -> Result<()> where F: Fn(&Shred, u64) -> bool + Sync, @@ -136,9 +158,7 @@ where } } - let blocktree_insert_metrics = - blocktree.insert_shreds(shreds, Some(leader_schedule_cache), false)?; - blocktree_insert_metrics.report_metrics("recv-window-insert-shreds"); + insert_shred_sender.send(shreds)?; trace!( "Elapsed processing time in recv_window(): {}", @@ -168,6 +188,7 @@ impl Drop for Finalizer { pub struct WindowService { t_window: JoinHandle<()>, + t_insert: JoinHandle<()>, repair_service: RepairService, } @@ -192,7 +213,6 @@ impl WindowService { { let bank_forks = match repair_strategy { RepairStrategy::RepairRange(_) => None, - RepairStrategy::RepairAll { ref bank_forks, .. } => Some(bank_forks.clone()), }; @@ -203,28 +223,109 @@ impl WindowService { cluster_info.clone(), repair_strategy, ); + + let (insert_sender, insert_receiver) = unbounded(); + + let t_insert = Self::start_window_insert_thread( + exit, + &blocktree, + leader_schedule_cache, + insert_receiver, + ); + + let t_window = Self::start_recv_window_thread( + cluster_info.read().unwrap().id(), + exit, + &blocktree, + insert_sender, + verified_receiver, + shred_filter, + bank_forks, + retransmit, + ); + + WindowService { + t_window, + t_insert, + repair_service, + } + } + + fn start_window_insert_thread( + exit: &Arc, + blocktree: &Arc, + leader_schedule_cache: &Arc, + insert_receiver: CrossbeamReceiver>, + ) -> JoinHandle<()> { let exit = exit.clone(); - let shred_filter = Arc::new(shred_filter); - let bank_forks = bank_forks.clone(); + let blocktree = blocktree.clone(); let leader_schedule_cache = leader_schedule_cache.clone(); - let t_window = Builder::new() + let mut handle_timeout = || {}; + let handle_error = || { + inc_new_counter_error!("solana-window-insert-error", 1, 1); + }; + Builder::new() + .name("solana-window-insert".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + + if let Err(e) = run_insert(&insert_receiver, &blocktree, &leader_schedule_cache) { + if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { + break; + } + } + }) + .unwrap() + } + + fn start_recv_window_thread( + id: Pubkey, + exit: &Arc, + blocktree: &Arc, + insert_sender: CrossbeamSender>, + verified_receiver: CrossbeamReceiver>, + shred_filter: F, + bank_forks: Option>>, + retransmit: PacketSender, + ) -> JoinHandle<()> + where + F: 'static + + Fn(&Pubkey, &Shred, Option>, u64) -> bool + + std::marker::Send + + std::marker::Sync, + { + let exit = exit.clone(); + let blocktree = blocktree.clone(); + Builder::new() .name("solana-window".to_string()) .spawn(move || { let _exit = Finalizer::new(exit.clone()); - let id = cluster_info.read().unwrap().id(); trace!("{}: RECV_WINDOW started", id); - let mut now = Instant::now(); let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) .build() .unwrap(); + let mut now = Instant::now(); + let handle_error = || { + inc_new_counter_error!("solana-window-error", 1, 1); + }; + loop { if exit.load(Ordering::Relaxed) { break; } + let mut handle_timeout = || { + if now.elapsed() > Duration::from_secs(30) { + warn!("Window does not seem to be receiving data. Ensure port configuration is correct..."); + now = Instant::now(); + } + }; if let Err(e) = recv_window( &blocktree, + &insert_sender, &id, &verified_receiver, &retransmit, @@ -239,36 +340,40 @@ impl WindowService { ) }, &thread_pool, - &leader_schedule_cache, ) { - match e { - Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => { - if now.elapsed() > Duration::from_secs(30) { - warn!("Window does not seem to be receiving data. Ensure port configuration is correct..."); - now = Instant::now(); - } - } - _ => { - inc_new_counter_error!("streamer-window-error", 1, 1); - error!("window error: {:?}", e); - } + if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { + break; } } else { now = Instant::now(); } } }) - .unwrap(); + .unwrap() + } - WindowService { - t_window, - repair_service, + fn should_exit_on_error(e: Error, handle_timeout: &mut F, handle_error: &H) -> bool + where + F: FnMut() -> (), + H: Fn() -> (), + { + match e { + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => true, + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => { + handle_timeout(); + false + } + _ => { + handle_error(); + error!("thread {:?} error {:?}", thread::current().name(), e); + false + } } } pub fn join(self) -> thread::Result<()> { self.t_window.join()?; + self.t_insert.join()?; self.repair_service.join() } } @@ -283,7 +388,6 @@ mod test { packet::{Packet, Packets}, repair_service::RepairSlotRange, }; - use crossbeam_channel::unbounded; use rand::thread_rng; use solana_ledger::shred::DataShredHeader; use solana_ledger::{