poll checking for new record in poh service after every batch of hashes instead of busy waiting (#16167)
* poll waiting in poh service after every batch of hashes * clippy
This commit is contained in:
parent
f4f2e781ff
commit
414c7070cb
|
@ -122,6 +122,10 @@ impl PohService {
|
|||
ticks_per_slot,
|
||||
hashes_per_batch,
|
||||
record_receiver,
|
||||
Self::target_ns_per_tick(
|
||||
ticks_per_slot,
|
||||
poh_config.target_tick_duration.as_nanos() as u64,
|
||||
),
|
||||
);
|
||||
}
|
||||
poh_exit_.store(true, Ordering::Relaxed);
|
||||
|
@ -210,6 +214,7 @@ impl PohService {
|
|||
record_receiver: &Receiver<Record>,
|
||||
hashes_per_batch: u64,
|
||||
poh: &Arc<Mutex<Poh>>,
|
||||
target_ns_per_tick: u64,
|
||||
) -> bool {
|
||||
match next_record.take() {
|
||||
Some(mut record) => {
|
||||
|
@ -254,6 +259,7 @@ impl PohService {
|
|||
timing.num_hashes += hashes_per_batch;
|
||||
let mut hash_time = Measure::start("hash");
|
||||
let should_tick = poh_l.hash(hashes_per_batch);
|
||||
let ideal_time = poh_l.target_poh_time(target_ns_per_tick);
|
||||
hash_time.stop();
|
||||
timing.total_hash_time_ns += hash_time.as_ns();
|
||||
if should_tick {
|
||||
|
@ -261,17 +267,30 @@ impl PohService {
|
|||
return true;
|
||||
}
|
||||
// check to see if a record request has been sent
|
||||
let get_again = record_receiver.try_recv();
|
||||
match get_again {
|
||||
Ok(record) => {
|
||||
if let Ok(record) = record_receiver.try_recv() {
|
||||
// remember the record we just received as the next record to occur
|
||||
*next_record = Some(record);
|
||||
break;
|
||||
}
|
||||
// check to see if we need to wait to catch up to ideal
|
||||
let wait_start = Instant::now();
|
||||
if ideal_time <= wait_start {
|
||||
// no, keep hashing. We still hold the lock.
|
||||
continue;
|
||||
}
|
||||
|
||||
// busy wait, polling for new records and after dropping poh lock (reset can occur, for example)
|
||||
drop(poh_l);
|
||||
while ideal_time > Instant::now() {
|
||||
// check to see if a record request has been sent
|
||||
if let Ok(record) = record_receiver.try_recv() {
|
||||
// remember the record we just received as the next record to occur
|
||||
*next_record = Some(record);
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
timing.total_sleep_us += wait_start.elapsed().as_micros() as u64;
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -284,6 +303,7 @@ impl PohService {
|
|||
ticks_per_slot: u64,
|
||||
hashes_per_batch: u64,
|
||||
record_receiver: Receiver<Record>,
|
||||
target_ns_per_tick: u64,
|
||||
) {
|
||||
let poh = poh_recorder.lock().unwrap().poh.clone();
|
||||
let mut timing = PohTiming::new();
|
||||
|
@ -296,6 +316,7 @@ impl PohService {
|
|||
&record_receiver,
|
||||
hashes_per_batch,
|
||||
&poh,
|
||||
target_ns_per_tick,
|
||||
);
|
||||
if should_tick {
|
||||
// Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing.
|
||||
|
|
Loading…
Reference in New Issue