PoH batch size calibration (#15717)

This commit is contained in:
sakridge 2021-03-05 16:01:21 -08:00 committed by GitHub
parent c5371fdc85
commit d09112fa6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 137 additions and 36 deletions

View File

@ -3,7 +3,7 @@
#![feature(test)]
extern crate test;
use solana_core::poh_service::NUM_HASHES_PER_BATCH;
use solana_core::poh_service::DEFAULT_HASHES_PER_BATCH;
use solana_ledger::poh::Poh;
use solana_sdk::hash::Hash;
use std::sync::atomic::{AtomicBool, Ordering};
@ -43,7 +43,7 @@ fn bench_arc_mutex_poh_batched_hash(bencher: &mut Bencher) {
bencher.iter(|| {
// NOTE: This block attempts to look as close as possible to `PohService::tick_producer()`
loop {
if poh.lock().unwrap().hash(NUM_HASHES_PER_BATCH) {
if poh.lock().unwrap().hash(DEFAULT_HASHES_PER_BATCH) {
poh.lock().unwrap().tick().unwrap();
if exit.load(Ordering::Relaxed) {
break;
@ -58,6 +58,6 @@ fn bench_arc_mutex_poh_batched_hash(bencher: &mut Bencher) {
fn bench_poh_lock_time_per_batch(bencher: &mut Bencher) {
let mut poh = Poh::new(Hash::default(), None);
bencher.iter(|| {
poh.hash(NUM_HASHES_PER_BATCH);
poh.hash(DEFAULT_HASHES_PER_BATCH);
})
}

View File

@ -1300,6 +1300,7 @@ pub fn create_test_recorder(
&exit,
bank.ticks_per_slot(),
poh_service::DEFAULT_PINNED_CPU_CORE,
poh_service::DEFAULT_HASHES_PER_BATCH,
);
(exit, poh_recorder, poh_service, entry_receiver)

View File

@ -74,6 +74,10 @@ pub struct PohRecorder {
leader_schedule_cache: Arc<LeaderScheduleCache>,
poh_config: Arc<PohConfig>,
ticks_per_slot: u64,
record_lock_contention_us: u64,
tick_lock_contention_us: u64,
tick_overhead_us: u64,
record_us: u64,
}
impl PohRecorder {
@ -349,20 +353,14 @@ impl PohRecorder {
pub fn tick(&mut self) {
let now = Instant::now();
let poh_entry = self.poh.lock().unwrap().tick();
inc_new_counter_info!(
"poh_recorder-tick_lock_contention",
timing::duration_as_us(&now.elapsed()) as usize
);
self.tick_lock_contention_us += timing::duration_as_us(&now.elapsed());
let now = Instant::now();
if let Some(poh_entry) = poh_entry {
self.tick_height += 1;
trace!("tick_height {}", self.tick_height);
if self.leader_first_tick_height.is_none() {
inc_new_counter_info!(
"poh_recorder-tick_overhead",
timing::duration_as_us(&now.elapsed()) as usize
);
self.tick_overhead_us += timing::duration_as_us(&now.elapsed());
return;
}
@ -375,10 +373,27 @@ impl PohRecorder {
self.tick_cache.push((entry, self.tick_height));
let _ = self.flush_cache(true);
}
inc_new_counter_info!(
"poh_recorder-tick_overhead",
timing::duration_as_us(&now.elapsed()) as usize
self.tick_overhead_us += timing::duration_as_us(&now.elapsed());
}
fn report_metrics(&mut self, bank_slot: Slot) {
datapoint_info!(
"poh_recorder",
("slot", bank_slot, i64),
("tick_lock_contention", self.tick_lock_contention_us, i64),
("record_us", self.record_us, i64),
("tick_overhead", self.tick_overhead_us, i64),
(
"record_lock_contention",
self.record_lock_contention_us,
i64
),
);
self.tick_lock_contention_us = 0;
self.record_us = 0;
self.tick_overhead_us = 0;
self.record_lock_contention_us = 0;
}
pub fn record(
@ -398,22 +413,18 @@ impl PohRecorder {
.as_ref()
.ok_or(PohRecorderError::MaxHeightReached)?;
if bank_slot != working_bank.bank.slot() {
self.report_metrics(bank_slot);
return Err(PohRecorderError::MaxHeightReached);
}
{
let now = Instant::now();
let mut poh_lock = self.poh.lock().unwrap();
inc_new_counter_info!(
"poh_recorder-record_lock_contention",
timing::duration_as_us(&now.elapsed()) as usize
);
self.record_lock_contention_us += timing::duration_as_us(&now.elapsed());
let now = Instant::now();
let res = poh_lock.record(mixin);
inc_new_counter_info!(
"poh_recorder-record_ms",
timing::duration_as_us(&now.elapsed()) as usize
);
self.record_us += timing::duration_as_us(&now.elapsed());
if let Some(poh_entry) = res {
let entry = Entry {
num_hashes: poh_entry.num_hashes,
@ -469,6 +480,10 @@ impl PohRecorder {
leader_schedule_cache: leader_schedule_cache.clone(),
ticks_per_slot,
poh_config: poh_config.clone(),
record_lock_contention_us: 0,
tick_lock_contention_us: 0,
record_us: 0,
tick_overhead_us: 0,
},
receiver,
)

View File

@ -16,8 +16,8 @@ pub struct PohService {
// * The larger this number is from 1, the speed of recording transactions will suffer due to lock
// contention with the PoH hashing within `tick_producer()`.
//
// See benches/poh.rs for some benchmarks that attempt to justify this magic number.
pub const NUM_HASHES_PER_BATCH: u64 = 1;
// Can use test_poh_service to calibrate this
pub const DEFAULT_HASHES_PER_BATCH: u64 = 64;
pub const DEFAULT_PINNED_CPU_CORE: usize = 0;
@ -30,6 +30,7 @@ impl PohService {
poh_exit: &Arc<AtomicBool>,
ticks_per_slot: u64,
pinned_cpu_core: usize,
hashes_per_batch: u64,
) -> Self {
let poh_exit_ = poh_exit.clone();
let poh_config = poh_config.clone();
@ -66,6 +67,7 @@ impl PohService {
&poh_exit_,
poh_config.target_tick_duration.as_nanos() as u64 - adjustment_per_tick,
ticks_per_slot,
hashes_per_batch,
);
}
poh_exit_.store(true, Ordering::Relaxed);
@ -107,8 +109,8 @@ impl PohService {
poh_exit: &AtomicBool,
target_tick_ns: u64,
ticks_per_slot: u64,
hashes_per_batch: u64,
) {
info!("starting with target ns: {}", target_tick_ns);
let poh = poh_recorder.lock().unwrap().poh.clone();
let mut now = Instant::now();
let mut last_metric = Instant::now();
@ -116,8 +118,8 @@ impl PohService {
let mut num_hashes = 0;
let mut total_sleep_us = 0;
loop {
num_hashes += NUM_HASHES_PER_BATCH;
if poh.lock().unwrap().hash(NUM_HASHES_PER_BATCH) {
num_hashes += hashes_per_batch;
if poh.lock().unwrap().hash(hashes_per_batch) {
// Lock PohRecorder only for the final hash...
poh_recorder.lock().unwrap().tick();
num_ticks += 1;
@ -161,17 +163,22 @@ impl PohService {
mod tests {
use super::*;
use crate::poh_recorder::WorkingBank;
use rand::{thread_rng, Rng};
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
use solana_measure::measure::Measure;
use solana_perf::test_tx::test_tx;
use solana_runtime::bank::Bank;
use solana_sdk::clock;
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing;
use std::time::Duration;
#[test]
fn test_poh_service() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new(&genesis_config));
let prev_hash = bank.last_blockhash();
@ -179,9 +186,13 @@ mod tests {
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let default_target_tick_duration =
timing::duration_as_us(&PohConfig::default().target_tick_duration);
let target_tick_duration = Duration::from_micros(default_target_tick_duration);
let poh_config = Arc::new(PohConfig {
hashes_per_tick: Some(2),
target_tick_duration: Duration::from_millis(42),
hashes_per_tick: Some(clock::DEFAULT_HASHES_PER_TICK),
target_tick_duration,
target_tick_count: None,
});
let (poh_recorder, entry_receiver) = PohRecorder::new(
@ -202,6 +213,14 @@ mod tests {
min_tick_height: bank.tick_height(),
max_tick_height: std::u64::MAX,
};
let ticks_per_slot = bank.ticks_per_slot();
// specify RUN_TIME to run in a benchmark-like mode
// to calibrate batch size
let run_time = std::env::var("RUN_TIME")
.map(|x| x.parse().unwrap())
.unwrap_or(0);
let is_test_run = run_time == 0;
let entry_producer = {
let poh_recorder = poh_recorder.clone();
@ -210,16 +229,33 @@ mod tests {
Builder::new()
.name("solana-poh-service-entry_producer".to_string())
.spawn(move || {
let now = Instant::now();
let mut total_us = 0;
let mut total_times = 0;
let h1 = hash(b"hello world!");
let tx = test_tx();
loop {
// send some data
let h1 = hash(b"hello world!");
let tx = test_tx();
let _ = poh_recorder
.lock()
.unwrap()
.record(bank.slot(), h1, vec![tx]);
let mut time = Measure::start("record");
let _ = poh_recorder.lock().unwrap().record(
bank.slot(),
h1,
vec![tx.clone()],
);
time.stop();
total_us += time.as_us();
total_times += 1;
if is_test_run && thread_rng().gen_ratio(1, 4) {
sleep(Duration::from_millis(200));
}
if exit.load(Ordering::Relaxed) {
info!(
"spent:{}ms record: {}ms entries recorded: {}",
now.elapsed().as_millis(),
total_us / 1000,
total_times,
);
break;
}
}
@ -227,12 +263,16 @@ mod tests {
.unwrap()
};
let hashes_per_batch = std::env::var("HASHES_PER_BATCH")
.map(|x| x.parse().unwrap())
.unwrap_or(DEFAULT_HASHES_PER_BATCH);
let poh_service = PohService::new(
poh_recorder.clone(),
&poh_config,
&exit,
0,
DEFAULT_PINNED_CPU_CORE,
hashes_per_batch,
);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
@ -241,11 +281,14 @@ mod tests {
let mut need_tick = true;
let mut need_entry = true;
let mut need_partial = true;
let mut num_ticks = 0;
while need_tick || need_entry || need_partial {
let time = Instant::now();
while run_time != 0 || need_tick || need_entry || need_partial {
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
if entry.is_tick() {
num_ticks += 1;
assert!(
entry.num_hashes <= poh_config.hashes_per_tick.unwrap(),
"{} <= {}",
@ -269,7 +312,35 @@ mod tests {
need_entry = false;
hashes += entry.num_hashes;
}
if run_time != 0 {
if time.elapsed().as_millis() > run_time {
break;
}
} else {
assert!(
time.elapsed().as_secs() < 60,
"Test should not run for this long! {}s tick {} entry {} partial {}",
time.elapsed().as_secs(),
need_tick,
need_entry,
need_partial,
);
}
}
info!(
"target_tick_duration: {} ticks_per_slot: {}",
poh_config.target_tick_duration.as_nanos(),
ticks_per_slot
);
let elapsed = time.elapsed();
info!(
"{} ticks in {}ms {}us/tick",
num_ticks,
elapsed.as_millis(),
elapsed.as_micros() / num_ticks
);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
entry_producer.join().unwrap();

View File

@ -122,6 +122,7 @@ pub struct ValidatorConfig {
pub send_transaction_leader_forward_count: u64,
pub no_poh_speed_test: bool,
pub poh_pinned_cpu_core: usize,
pub poh_hashes_per_batch: u64,
pub account_indexes: HashSet<AccountIndex>,
pub accounts_db_caching_enabled: bool,
pub warp_slot: Option<Slot>,
@ -175,6 +176,7 @@ impl Default for ValidatorConfig {
send_transaction_leader_forward_count: 2,
no_poh_speed_test: true,
poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
account_indexes: HashSet::new(),
accounts_db_caching_enabled: false,
warp_slot: None,
@ -641,6 +643,7 @@ impl Validator {
&exit,
bank.ticks_per_slot(),
config.poh_pinned_cpu_core,
config.poh_hashes_per_batch,
);
assert_eq!(
blockstore.new_shreds_signals.len(),

View File

@ -52,6 +52,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
accounts_db_use_index_hash_calculation: config.accounts_db_use_index_hash_calculation,
tpu_coalesce_ms: config.tpu_coalesce_ms,
validator_exit: Arc::new(RwLock::new(ValidatorExit::default())),
poh_hashes_per_batch: config.poh_hashes_per_batch,
}
}

View File

@ -1606,6 +1606,14 @@ pub fn main() {
})
.help("EXPERIMENTAL: Specify which CPU core PoH is pinned to"),
)
.arg(
Arg::with_name("poh_hashes_per_batch")
.hidden(true)
.long("poh-hashes-per-batch")
.takes_value(true)
.value_name("NUM")
.help("Specify hashes per batch in PoH service"),
)
.arg(
Arg::with_name("account_indexes")
.long("account-index")
@ -1889,6 +1897,8 @@ pub fn main() {
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core")
.unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
poh_hashes_per_batch: value_of(&matches, "poh_hashes_per_batch")
.unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH),
account_indexes,
accounts_db_caching_enabled: !matches.is_present("no_accounts_db_caching"),
accounts_db_test_hash_calculation: matches.is_present("accounts_db_test_hash_calculation"),