Throttle PoH ticks by cumulative slot time (#16139)
* Throttle PoH ticks by cumulative slot time * respond to pr feedback * saturating sub * updated comment
This commit is contained in:
parent
a1f1f573d5
commit
4f4cffbd03
|
@ -10,6 +10,7 @@
|
||||||
//! For Entries:
|
//! For Entries:
|
||||||
//! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::max_tick_height
|
//! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::max_tick_height
|
||||||
//!
|
//!
|
||||||
|
use crate::poh_service::PohService;
|
||||||
use solana_ledger::blockstore::Blockstore;
|
use solana_ledger::blockstore::Blockstore;
|
||||||
use solana_ledger::entry::Entry;
|
use solana_ledger::entry::Entry;
|
||||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||||
|
@ -151,6 +152,7 @@ pub struct PohRecorder {
|
||||||
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||||
poh_config: Arc<PohConfig>,
|
poh_config: Arc<PohConfig>,
|
||||||
ticks_per_slot: u64,
|
ticks_per_slot: u64,
|
||||||
|
target_ns_per_tick: u64,
|
||||||
record_lock_contention_us: u64,
|
record_lock_contention_us: u64,
|
||||||
flush_cache_no_tick_us: u64,
|
flush_cache_no_tick_us: u64,
|
||||||
flush_cache_tick_us: u64,
|
flush_cache_tick_us: u64,
|
||||||
|
@ -158,6 +160,7 @@ pub struct PohRecorder {
|
||||||
send_us: u64,
|
send_us: u64,
|
||||||
tick_lock_contention_us: u64,
|
tick_lock_contention_us: u64,
|
||||||
tick_overhead_us: u64,
|
tick_overhead_us: u64,
|
||||||
|
total_sleep_us: u64,
|
||||||
record_us: u64,
|
record_us: u64,
|
||||||
ticks_from_record: u64,
|
ticks_from_record: u64,
|
||||||
last_metric: Instant,
|
last_metric: Instant,
|
||||||
|
@ -462,7 +465,17 @@ impl PohRecorder {
|
||||||
|
|
||||||
pub fn tick(&mut self) {
|
pub fn tick(&mut self) {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let poh_entry = self.poh.lock().unwrap().tick();
|
let (poh_entry, target_time) = {
|
||||||
|
let mut poh_l = self.poh.lock().unwrap();
|
||||||
|
let poh_entry = poh_l.tick();
|
||||||
|
let target_time = if poh_entry.is_some() {
|
||||||
|
Some(poh_l.target_poh_time(self.target_ns_per_tick))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
(poh_entry, target_time)
|
||||||
|
};
|
||||||
self.tick_lock_contention_us += timing::duration_as_us(&now.elapsed());
|
self.tick_lock_contention_us += timing::duration_as_us(&now.elapsed());
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
if let Some(poh_entry) = poh_entry {
|
if let Some(poh_entry) = poh_entry {
|
||||||
|
@ -485,6 +498,15 @@ impl PohRecorder {
|
||||||
self.tick_cache.push((entry, self.tick_height));
|
self.tick_cache.push((entry, self.tick_height));
|
||||||
let _ = self.flush_cache(true);
|
let _ = self.flush_cache(true);
|
||||||
self.flush_cache_tick_us += timing::duration_as_us(&now.elapsed());
|
self.flush_cache_tick_us += timing::duration_as_us(&now.elapsed());
|
||||||
|
let target_time = target_time.unwrap();
|
||||||
|
// sleep is not accurate enough to get a predictable time.
|
||||||
|
// Kernel can not schedule the thread for a while.
|
||||||
|
let started_waiting = Instant::now();
|
||||||
|
while Instant::now() < target_time {
|
||||||
|
// TODO: a caller could possibly desire to reset or record while we're spinning here
|
||||||
|
std::hint::spin_loop();
|
||||||
|
}
|
||||||
|
self.total_sleep_us += started_waiting.elapsed().as_nanos() as u64 / 1000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,6 +522,7 @@ impl PohRecorder {
|
||||||
("prepare_send_us", self.prepare_send_us, i64),
|
("prepare_send_us", self.prepare_send_us, i64),
|
||||||
("send_us", self.send_us, i64),
|
("send_us", self.send_us, i64),
|
||||||
("ticks_from_record", self.ticks_from_record, i64),
|
("ticks_from_record", self.ticks_from_record, i64),
|
||||||
|
("total_sleep_us", self.total_sleep_us, i64),
|
||||||
("tick_overhead", self.tick_overhead_us, i64),
|
("tick_overhead", self.tick_overhead_us, i64),
|
||||||
(
|
(
|
||||||
"record_lock_contention",
|
"record_lock_contention",
|
||||||
|
@ -511,6 +534,7 @@ impl PohRecorder {
|
||||||
self.tick_lock_contention_us = 0;
|
self.tick_lock_contention_us = 0;
|
||||||
self.record_us = 0;
|
self.record_us = 0;
|
||||||
self.tick_overhead_us = 0;
|
self.tick_overhead_us = 0;
|
||||||
|
self.total_sleep_us = 0;
|
||||||
self.record_lock_contention_us = 0;
|
self.record_lock_contention_us = 0;
|
||||||
self.flush_cache_no_tick_us = 0;
|
self.flush_cache_no_tick_us = 0;
|
||||||
self.flush_cache_tick_us = 0;
|
self.flush_cache_tick_us = 0;
|
||||||
|
@ -588,10 +612,18 @@ impl PohRecorder {
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
poh_config: &Arc<PohConfig>,
|
poh_config: &Arc<PohConfig>,
|
||||||
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
|
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
|
||||||
let poh = Arc::new(Mutex::new(Poh::new(
|
let tick_number = 0;
|
||||||
|
let poh = Arc::new(Mutex::new(Poh::new_with_slot_info(
|
||||||
last_entry_hash,
|
last_entry_hash,
|
||||||
poh_config.hashes_per_tick,
|
poh_config.hashes_per_tick,
|
||||||
|
ticks_per_slot,
|
||||||
|
tick_number,
|
||||||
)));
|
)));
|
||||||
|
|
||||||
|
let target_ns_per_tick = PohService::target_ns_per_tick(
|
||||||
|
ticks_per_slot,
|
||||||
|
poh_config.target_tick_duration.as_nanos() as u64,
|
||||||
|
);
|
||||||
let (sender, receiver) = channel();
|
let (sender, receiver) = channel();
|
||||||
let (record_sender, record_receiver) = channel();
|
let (record_sender, record_receiver) = channel();
|
||||||
let (leader_first_tick_height, leader_last_tick_height, grace_ticks) =
|
let (leader_first_tick_height, leader_last_tick_height, grace_ticks) =
|
||||||
|
@ -613,6 +645,7 @@ impl PohRecorder {
|
||||||
blockstore: blockstore.clone(),
|
blockstore: blockstore.clone(),
|
||||||
leader_schedule_cache: leader_schedule_cache.clone(),
|
leader_schedule_cache: leader_schedule_cache.clone(),
|
||||||
ticks_per_slot,
|
ticks_per_slot,
|
||||||
|
target_ns_per_tick,
|
||||||
poh_config: poh_config.clone(),
|
poh_config: poh_config.clone(),
|
||||||
record_lock_contention_us: 0,
|
record_lock_contention_us: 0,
|
||||||
flush_cache_tick_us: 0,
|
flush_cache_tick_us: 0,
|
||||||
|
@ -622,6 +655,7 @@ impl PohRecorder {
|
||||||
tick_lock_contention_us: 0,
|
tick_lock_contention_us: 0,
|
||||||
record_us: 0,
|
record_us: 0,
|
||||||
tick_overhead_us: 0,
|
tick_overhead_us: 0,
|
||||||
|
total_sleep_us: 0,
|
||||||
ticks_from_record: 0,
|
ticks_from_record: 0,
|
||||||
last_metric: Instant::now(),
|
last_metric: Instant::now(),
|
||||||
record_sender,
|
record_sender,
|
||||||
|
|
|
@ -116,17 +116,9 @@ impl PohService {
|
||||||
if let Some(cores) = core_affinity::get_core_ids() {
|
if let Some(cores) = core_affinity::get_core_ids() {
|
||||||
core_affinity::set_for_current(cores[pinned_cpu_core]);
|
core_affinity::set_for_current(cores[pinned_cpu_core]);
|
||||||
}
|
}
|
||||||
// Account for some extra time outside of PoH generation to account
|
|
||||||
// for processing time outside PoH.
|
|
||||||
let adjustment_per_tick = if ticks_per_slot > 0 {
|
|
||||||
TARGET_SLOT_ADJUSTMENT_NS / ticks_per_slot
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
Self::tick_producer(
|
Self::tick_producer(
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
&poh_exit_,
|
&poh_exit_,
|
||||||
poh_config.target_tick_duration.as_nanos() as u64 - adjustment_per_tick,
|
|
||||||
ticks_per_slot,
|
ticks_per_slot,
|
||||||
hashes_per_batch,
|
hashes_per_batch,
|
||||||
record_receiver,
|
record_receiver,
|
||||||
|
@ -139,6 +131,17 @@ impl PohService {
|
||||||
Self { tick_producer }
|
Self { tick_producer }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn target_ns_per_tick(ticks_per_slot: u64, target_tick_duration_ns: u64) -> u64 {
|
||||||
|
// Account for some extra time outside of PoH generation to account
|
||||||
|
// for processing time outside PoH.
|
||||||
|
let adjustment_per_tick = if ticks_per_slot > 0 {
|
||||||
|
TARGET_SLOT_ADJUSTMENT_NS / ticks_per_slot
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
target_tick_duration_ns.saturating_sub(adjustment_per_tick)
|
||||||
|
}
|
||||||
|
|
||||||
fn sleepy_tick_producer(
|
fn sleepy_tick_producer(
|
||||||
poh_recorder: Arc<Mutex<PohRecorder>>,
|
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||||
poh_config: &PohConfig,
|
poh_config: &PohConfig,
|
||||||
|
@ -199,6 +202,7 @@ impl PohService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// returns true if we need to tick
|
||||||
fn record_or_hash(
|
fn record_or_hash(
|
||||||
next_record: &mut Option<Record>,
|
next_record: &mut Option<Record>,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
|
@ -253,7 +257,8 @@ impl PohService {
|
||||||
hash_time.stop();
|
hash_time.stop();
|
||||||
timing.total_hash_time_ns += hash_time.as_ns();
|
timing.total_hash_time_ns += hash_time.as_ns();
|
||||||
if should_tick {
|
if should_tick {
|
||||||
return true; // nothing else can be done. tick required.
|
// nothing else can be done. tick required.
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
// check to see if a record request has been sent
|
// check to see if a record request has been sent
|
||||||
let get_again = record_receiver.try_recv();
|
let get_again = record_receiver.try_recv();
|
||||||
|
@ -276,13 +281,11 @@ impl PohService {
|
||||||
fn tick_producer(
|
fn tick_producer(
|
||||||
poh_recorder: Arc<Mutex<PohRecorder>>,
|
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||||
poh_exit: &AtomicBool,
|
poh_exit: &AtomicBool,
|
||||||
target_tick_ns: u64,
|
|
||||||
ticks_per_slot: u64,
|
ticks_per_slot: u64,
|
||||||
hashes_per_batch: u64,
|
hashes_per_batch: u64,
|
||||||
record_receiver: Receiver<Record>,
|
record_receiver: Receiver<Record>,
|
||||||
) {
|
) {
|
||||||
let poh = poh_recorder.lock().unwrap().poh.clone();
|
let poh = poh_recorder.lock().unwrap().poh.clone();
|
||||||
let mut now = Instant::now();
|
|
||||||
let mut timing = PohTiming::new();
|
let mut timing = PohTiming::new();
|
||||||
let mut next_record = None;
|
let mut next_record = None;
|
||||||
loop {
|
loop {
|
||||||
|
@ -307,14 +310,6 @@ impl PohService {
|
||||||
timing.total_tick_time_ns += tick_time.as_ns();
|
timing.total_tick_time_ns += tick_time.as_ns();
|
||||||
}
|
}
|
||||||
timing.num_ticks += 1;
|
timing.num_ticks += 1;
|
||||||
let elapsed_ns = now.elapsed().as_nanos() as u64;
|
|
||||||
// sleep is not accurate enough to get a predictable time.
|
|
||||||
// Kernel can not schedule the thread for a while.
|
|
||||||
while (now.elapsed().as_nanos() as u64) < target_tick_ns {
|
|
||||||
std::hint::spin_loop();
|
|
||||||
}
|
|
||||||
timing.total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000;
|
|
||||||
now = Instant::now();
|
|
||||||
|
|
||||||
timing.report(ticks_per_slot);
|
timing.report(ticks_per_slot);
|
||||||
if poh_exit.load(Ordering::Relaxed) {
|
if poh_exit.load(Ordering::Relaxed) {
|
||||||
|
|
|
@ -8,6 +8,9 @@ pub struct Poh {
|
||||||
num_hashes: u64,
|
num_hashes: u64,
|
||||||
hashes_per_tick: u64,
|
hashes_per_tick: u64,
|
||||||
remaining_hashes: u64,
|
remaining_hashes: u64,
|
||||||
|
ticks_per_slot: u64,
|
||||||
|
tick_number: u64,
|
||||||
|
slot_start_time: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -18,23 +21,47 @@ pub struct PohEntry {
|
||||||
|
|
||||||
impl Poh {
|
impl Poh {
|
||||||
pub fn new(hash: Hash, hashes_per_tick: Option<u64>) -> Self {
|
pub fn new(hash: Hash, hashes_per_tick: Option<u64>) -> Self {
|
||||||
|
Self::new_with_slot_info(hash, hashes_per_tick, 0, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new_with_slot_info(
|
||||||
|
hash: Hash,
|
||||||
|
hashes_per_tick: Option<u64>,
|
||||||
|
ticks_per_slot: u64,
|
||||||
|
tick_number: u64,
|
||||||
|
) -> Self {
|
||||||
let hashes_per_tick = hashes_per_tick.unwrap_or(std::u64::MAX);
|
let hashes_per_tick = hashes_per_tick.unwrap_or(std::u64::MAX);
|
||||||
assert!(hashes_per_tick > 1);
|
assert!(hashes_per_tick > 1);
|
||||||
|
let now = Instant::now();
|
||||||
Poh {
|
Poh {
|
||||||
hash,
|
hash,
|
||||||
num_hashes: 0,
|
num_hashes: 0,
|
||||||
hashes_per_tick,
|
hashes_per_tick,
|
||||||
remaining_hashes: hashes_per_tick,
|
remaining_hashes: hashes_per_tick,
|
||||||
|
ticks_per_slot,
|
||||||
|
tick_number,
|
||||||
|
slot_start_time: now,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reset(&mut self, hash: Hash, hashes_per_tick: Option<u64>) {
|
pub fn reset(&mut self, hash: Hash, hashes_per_tick: Option<u64>) {
|
||||||
let mut poh = Poh::new(hash, hashes_per_tick);
|
// retains ticks_per_slot: this cannot change without restarting the validator
|
||||||
|
let tick_number = 0;
|
||||||
|
let mut poh =
|
||||||
|
Poh::new_with_slot_info(hash, hashes_per_tick, self.ticks_per_slot, tick_number);
|
||||||
std::mem::swap(&mut poh, self);
|
std::mem::swap(&mut poh, self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn target_poh_time(&self, target_ns_per_tick: u64) -> Instant {
|
||||||
|
assert!(self.hashes_per_tick > 0);
|
||||||
|
let offset_tick_ns = target_ns_per_tick * self.tick_number;
|
||||||
|
let offset_ns = target_ns_per_tick * self.num_hashes / self.hashes_per_tick;
|
||||||
|
self.slot_start_time + Duration::from_nanos(offset_ns + offset_tick_ns)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn hash(&mut self, max_num_hashes: u64) -> bool {
|
pub fn hash(&mut self, max_num_hashes: u64) -> bool {
|
||||||
let num_hashes = std::cmp::min(self.remaining_hashes - 1, max_num_hashes);
|
let num_hashes = std::cmp::min(self.remaining_hashes - 1, max_num_hashes);
|
||||||
|
|
||||||
for _ in 0..num_hashes {
|
for _ in 0..num_hashes {
|
||||||
self.hash = hash(&self.hash.as_ref());
|
self.hash = hash(&self.hash.as_ref());
|
||||||
}
|
}
|
||||||
|
@ -75,6 +102,7 @@ impl Poh {
|
||||||
let num_hashes = self.num_hashes;
|
let num_hashes = self.num_hashes;
|
||||||
self.remaining_hashes = self.hashes_per_tick;
|
self.remaining_hashes = self.hashes_per_tick;
|
||||||
self.num_hashes = 0;
|
self.num_hashes = 0;
|
||||||
|
self.tick_number += 1;
|
||||||
Some(PohEntry {
|
Some(PohEntry {
|
||||||
num_hashes,
|
num_hashes,
|
||||||
hash: self.hash,
|
hash: self.hash,
|
||||||
|
@ -102,6 +130,7 @@ mod tests {
|
||||||
use crate::poh::{Poh, PohEntry};
|
use crate::poh::{Poh, PohEntry};
|
||||||
use matches::assert_matches;
|
use matches::assert_matches;
|
||||||
use solana_sdk::hash::{hash, hashv, Hash};
|
use solana_sdk::hash::{hash, hashv, Hash};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
fn verify(initial_hash: Hash, entries: &[(PohEntry, Option<Hash>)]) -> bool {
|
fn verify(initial_hash: Hash, entries: &[(PohEntry, Option<Hash>)]) -> bool {
|
||||||
let mut current_hash = initial_hash;
|
let mut current_hash = initial_hash;
|
||||||
|
@ -124,6 +153,42 @@ mod tests {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_target_poh_time() {
|
||||||
|
let zero = Hash::default();
|
||||||
|
for target_ns_per_tick in 10..12 {
|
||||||
|
let mut poh = Poh::new(zero, None);
|
||||||
|
assert_eq!(poh.target_poh_time(target_ns_per_tick), poh.slot_start_time);
|
||||||
|
poh.tick_number = 2;
|
||||||
|
assert_eq!(
|
||||||
|
poh.target_poh_time(target_ns_per_tick),
|
||||||
|
poh.slot_start_time + Duration::from_nanos(target_ns_per_tick * 2)
|
||||||
|
);
|
||||||
|
let mut poh = Poh::new(zero, Some(5));
|
||||||
|
assert_eq!(poh.target_poh_time(target_ns_per_tick), poh.slot_start_time);
|
||||||
|
poh.tick_number = 2;
|
||||||
|
assert_eq!(
|
||||||
|
poh.target_poh_time(target_ns_per_tick),
|
||||||
|
poh.slot_start_time + Duration::from_nanos(target_ns_per_tick * 2)
|
||||||
|
);
|
||||||
|
poh.num_hashes = 3;
|
||||||
|
assert_eq!(
|
||||||
|
poh.target_poh_time(target_ns_per_tick),
|
||||||
|
poh.slot_start_time
|
||||||
|
+ Duration::from_nanos(target_ns_per_tick * 2 + target_ns_per_tick * 3 / 5)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic(expected = "assertion failed: hashes_per_tick > 1")]
|
||||||
|
fn test_target_poh_time_hashes_per_tick() {
|
||||||
|
let zero = Hash::default();
|
||||||
|
let poh = Poh::new(zero, Some(0));
|
||||||
|
let target_ns_per_tick = 10;
|
||||||
|
poh.target_poh_time(target_ns_per_tick);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_poh_verify() {
|
fn test_poh_verify() {
|
||||||
let zero = Hash::default();
|
let zero = Hash::default();
|
||||||
|
|
Loading…
Reference in New Issue