Eagerly receive records in sleepy tick producer (#21832)

This commit is contained in:
Justin Starry 2021-12-14 16:25:28 -05:00 committed by GitHub
parent 9da826421a
commit e4f7af0b48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 8 deletions

View File

@ -12,7 +12,7 @@ use {
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Mutex, Arc, Mutex,
}, },
thread::{self, sleep, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
}, },
}; };
@ -160,16 +160,22 @@ impl PohService {
poh_exit: &AtomicBool, poh_exit: &AtomicBool,
record_receiver: Receiver<Record>, record_receiver: Receiver<Record>,
) { ) {
let mut last_tick = Instant::now();
while !poh_exit.load(Ordering::Relaxed) { while !poh_exit.load(Ordering::Relaxed) {
let remaining_tick_time = poh_config
.target_tick_duration
.saturating_sub(last_tick.elapsed());
Self::read_record_receiver_and_process( Self::read_record_receiver_and_process(
&poh_recorder, &poh_recorder,
&record_receiver, &record_receiver,
Duration::from_millis(0), remaining_tick_time,
); );
sleep(poh_config.target_tick_duration); if remaining_tick_time.is_zero() {
last_tick = Instant::now();
poh_recorder.lock().unwrap().tick(); poh_recorder.lock().unwrap().tick();
} }
} }
}
pub fn read_record_receiver_and_process( pub fn read_record_receiver_and_process(
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -199,14 +205,23 @@ impl PohService {
record_receiver: Receiver<Record>, record_receiver: Receiver<Record>,
) { ) {
let mut warned = false; let mut warned = false;
for _ in 0..poh_config.target_tick_count.unwrap() { let mut elapsed_ticks = 0;
let mut last_tick = Instant::now();
let num_ticks = poh_config.target_tick_count.unwrap();
while elapsed_ticks < num_ticks {
let remaining_tick_time = poh_config
.target_tick_duration
.saturating_sub(last_tick.elapsed());
Self::read_record_receiver_and_process( Self::read_record_receiver_and_process(
&poh_recorder, &poh_recorder,
&record_receiver, &record_receiver,
Duration::from_millis(0), Duration::from_millis(0),
); );
sleep(poh_config.target_tick_duration); if remaining_tick_time.is_zero() {
last_tick = Instant::now();
poh_recorder.lock().unwrap().tick(); poh_recorder.lock().unwrap().tick();
elapsed_ticks += 1;
}
if poh_exit.load(Ordering::Relaxed) && !warned { if poh_exit.load(Ordering::Relaxed) && !warned {
warned = true; warned = true;
warn!("exit signal is ignored because PohService is scheduled to exit soon"); warn!("exit signal is ignored because PohService is scheduled to exit soon");
@ -370,7 +385,7 @@ mod tests {
solana_sdk::{ solana_sdk::{
clock, hash::hash, pubkey::Pubkey, timing, transaction::VersionedTransaction, clock, hash::hash, pubkey::Pubkey, timing, transaction::VersionedTransaction,
}, },
std::time::Duration, std::{thread::sleep, time::Duration},
}; };
#[test] #[test]