Move window insertion to separate thread (#7397)

* Move window insertion to separate thread
This commit is contained in:
carllin 2019-12-19 00:15:49 -08:00 committed by GitHub
parent ff171baa67
commit e98132fd76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 133 additions and 29 deletions

View File

@ -6,10 +6,13 @@ use crate::packet::Packets;
use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result};
use crate::streamer::PacketSender;
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use crossbeam_channel::{
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
};
use rayon::iter::IntoParallelRefMutIterator;
use rayon::iter::ParallelIterator;
use rayon::ThreadPool;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::blocktree::{self, Blocktree};
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::shred::Shred;
@ -67,14 +70,33 @@ pub fn should_retransmit_and_persist(
}
}
fn run_insert(
shred_receiver: &CrossbeamReceiver<Vec<Shred>>,
blocktree: &Arc<Blocktree>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut shreds = shred_receiver.recv_timeout(timer)?;
while let Ok(mut more_shreds) = shred_receiver.try_recv() {
shreds.append(&mut more_shreds)
}
let blocktree_insert_metrics =
blocktree.insert_shreds(shreds, Some(leader_schedule_cache), false)?;
blocktree_insert_metrics.report_metrics("recv-window-insert-shreds");
Ok(())
}
fn recv_window<F>(
blocktree: &Arc<Blocktree>,
insert_shred_sender: &CrossbeamSender<Vec<Shred>>,
my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
retransmit: &PacketSender,
shred_filter: F,
thread_pool: &ThreadPool,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Result<()>
where
F: Fn(&Shred, u64) -> bool + Sync,
@ -136,9 +158,7 @@ where
}
}
let blocktree_insert_metrics =
blocktree.insert_shreds(shreds, Some(leader_schedule_cache), false)?;
blocktree_insert_metrics.report_metrics("recv-window-insert-shreds");
insert_shred_sender.send(shreds)?;
trace!(
"Elapsed processing time in recv_window(): {}",
@ -168,6 +188,7 @@ impl Drop for Finalizer {
pub struct WindowService {
t_window: JoinHandle<()>,
t_insert: JoinHandle<()>,
repair_service: RepairService,
}
@ -192,7 +213,6 @@ impl WindowService {
{
let bank_forks = match repair_strategy {
RepairStrategy::RepairRange(_) => None,
RepairStrategy::RepairAll { ref bank_forks, .. } => Some(bank_forks.clone()),
};
@ -203,28 +223,109 @@ impl WindowService {
cluster_info.clone(),
repair_strategy,
);
let (insert_sender, insert_receiver) = unbounded();
let t_insert = Self::start_window_insert_thread(
exit,
&blocktree,
leader_schedule_cache,
insert_receiver,
);
let t_window = Self::start_recv_window_thread(
cluster_info.read().unwrap().id(),
exit,
&blocktree,
insert_sender,
verified_receiver,
shred_filter,
bank_forks,
retransmit,
);
WindowService {
t_window,
t_insert,
repair_service,
}
}
fn start_window_insert_thread(
exit: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
insert_receiver: CrossbeamReceiver<Vec<Shred>>,
) -> JoinHandle<()> {
let exit = exit.clone();
let shred_filter = Arc::new(shred_filter);
let bank_forks = bank_forks.clone();
let blocktree = blocktree.clone();
let leader_schedule_cache = leader_schedule_cache.clone();
let t_window = Builder::new()
let mut handle_timeout = || {};
let handle_error = || {
inc_new_counter_error!("solana-window-insert-error", 1, 1);
};
Builder::new()
.name("solana-window-insert".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = run_insert(&insert_receiver, &blocktree, &leader_schedule_cache) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
break;
}
}
})
.unwrap()
}
fn start_recv_window_thread<F>(
id: Pubkey,
exit: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
insert_sender: CrossbeamSender<Vec<Shred>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
shred_filter: F,
bank_forks: Option<Arc<RwLock<BankForks>>>,
retransmit: PacketSender,
) -> JoinHandle<()>
where
F: 'static
+ Fn(&Pubkey, &Shred, Option<Arc<Bank>>, u64) -> bool
+ std::marker::Send
+ std::marker::Sync,
{
let exit = exit.clone();
let blocktree = blocktree.clone();
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit.clone());
let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id);
let mut now = Instant::now();
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap();
let mut now = Instant::now();
let handle_error = || {
inc_new_counter_error!("solana-window-error", 1, 1);
};
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let mut handle_timeout = || {
if now.elapsed() > Duration::from_secs(30) {
warn!("Window does not seem to be receiving data. Ensure port configuration is correct...");
now = Instant::now();
}
};
if let Err(e) = recv_window(
&blocktree,
&insert_sender,
&id,
&verified_receiver,
&retransmit,
@ -239,36 +340,40 @@ impl WindowService {
)
},
&thread_pool,
&leader_schedule_cache,
) {
match e {
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => {
if now.elapsed() > Duration::from_secs(30) {
warn!("Window does not seem to be receiving data. Ensure port configuration is correct...");
now = Instant::now();
}
}
_ => {
inc_new_counter_error!("streamer-window-error", 1, 1);
error!("window error: {:?}", e);
}
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
break;
}
} else {
now = Instant::now();
}
}
})
.unwrap();
.unwrap()
}
WindowService {
t_window,
repair_service,
fn should_exit_on_error<F, H>(e: Error, handle_timeout: &mut F, handle_error: &H) -> bool
where
F: FnMut() -> (),
H: Fn() -> (),
{
match e {
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => true,
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => {
handle_timeout();
false
}
_ => {
handle_error();
error!("thread {:?} error {:?}", thread::current().name(), e);
false
}
}
}
pub fn join(self) -> thread::Result<()> {
self.t_window.join()?;
self.t_insert.join()?;
self.repair_service.join()
}
}
@ -283,7 +388,6 @@ mod test {
packet::{Packet, Packets},
repair_service::RepairSlotRange,
};
use crossbeam_channel::unbounded;
use rand::thread_rng;
use solana_ledger::shred::DataShredHeader;
use solana_ledger::{