From d09112fa6d2a28ccc708bc393032b6af65063253 Mon Sep 17 00:00:00 2001 From: sakridge Date: Fri, 5 Mar 2021 16:01:21 -0800 Subject: [PATCH] PoH batch size calibration (#15717) --- core/benches/poh.rs | 6 +- core/src/banking_stage.rs | 1 + core/src/poh_recorder.rs | 53 +++++++++----- core/src/poh_service.rs | 99 ++++++++++++++++++++++---- core/src/validator.rs | 3 + local-cluster/src/validator_configs.rs | 1 + validator/src/main.rs | 10 +++ 7 files changed, 137 insertions(+), 36 deletions(-) diff --git a/core/benches/poh.rs b/core/benches/poh.rs index 726a28a970..f7a7244a97 100644 --- a/core/benches/poh.rs +++ b/core/benches/poh.rs @@ -3,7 +3,7 @@ #![feature(test)] extern crate test; -use solana_core::poh_service::NUM_HASHES_PER_BATCH; +use solana_core::poh_service::DEFAULT_HASHES_PER_BATCH; use solana_ledger::poh::Poh; use solana_sdk::hash::Hash; use std::sync::atomic::{AtomicBool, Ordering}; @@ -43,7 +43,7 @@ fn bench_arc_mutex_poh_batched_hash(bencher: &mut Bencher) { bencher.iter(|| { // NOTE: This block attempts to look as close as possible to `PohService::tick_producer()` loop { - if poh.lock().unwrap().hash(NUM_HASHES_PER_BATCH) { + if poh.lock().unwrap().hash(DEFAULT_HASHES_PER_BATCH) { poh.lock().unwrap().tick().unwrap(); if exit.load(Ordering::Relaxed) { break; @@ -58,6 +58,6 @@ fn bench_arc_mutex_poh_batched_hash(bencher: &mut Bencher) { fn bench_poh_lock_time_per_batch(bencher: &mut Bencher) { let mut poh = Poh::new(Hash::default(), None); bencher.iter(|| { - poh.hash(NUM_HASHES_PER_BATCH); + poh.hash(DEFAULT_HASHES_PER_BATCH); }) } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 157e180386..d64881ccb7 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1300,6 +1300,7 @@ pub fn create_test_recorder( &exit, bank.ticks_per_slot(), poh_service::DEFAULT_PINNED_CPU_CORE, + poh_service::DEFAULT_HASHES_PER_BATCH, ); (exit, poh_recorder, poh_service, entry_receiver) diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 6fa6ea3a58..4492bf642f 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -74,6 +74,10 @@ pub struct PohRecorder { leader_schedule_cache: Arc, poh_config: Arc, ticks_per_slot: u64, + record_lock_contention_us: u64, + tick_lock_contention_us: u64, + tick_overhead_us: u64, + record_us: u64, } impl PohRecorder { @@ -349,20 +353,14 @@ impl PohRecorder { pub fn tick(&mut self) { let now = Instant::now(); let poh_entry = self.poh.lock().unwrap().tick(); - inc_new_counter_info!( - "poh_recorder-tick_lock_contention", - timing::duration_as_us(&now.elapsed()) as usize - ); + self.tick_lock_contention_us += timing::duration_as_us(&now.elapsed()); let now = Instant::now(); if let Some(poh_entry) = poh_entry { self.tick_height += 1; trace!("tick_height {}", self.tick_height); if self.leader_first_tick_height.is_none() { - inc_new_counter_info!( - "poh_recorder-tick_overhead", - timing::duration_as_us(&now.elapsed()) as usize - ); + self.tick_overhead_us += timing::duration_as_us(&now.elapsed()); return; } @@ -375,10 +373,27 @@ impl PohRecorder { self.tick_cache.push((entry, self.tick_height)); let _ = self.flush_cache(true); } - inc_new_counter_info!( - "poh_recorder-tick_overhead", - timing::duration_as_us(&now.elapsed()) as usize + self.tick_overhead_us += timing::duration_as_us(&now.elapsed()); + } + + fn report_metrics(&mut self, bank_slot: Slot) { + datapoint_info!( + "poh_recorder", + ("slot", bank_slot, i64), + ("tick_lock_contention", self.tick_lock_contention_us, i64), + ("record_us", self.record_us, i64), + ("tick_overhead", self.tick_overhead_us, i64), + ( + "record_lock_contention", + self.record_lock_contention_us, + i64 + ), ); + + self.tick_lock_contention_us = 0; + self.record_us = 0; + self.tick_overhead_us = 0; + self.record_lock_contention_us = 0; } pub fn record( @@ -398,22 +413,18 @@ impl PohRecorder { .as_ref() .ok_or(PohRecorderError::MaxHeightReached)?; if bank_slot != working_bank.bank.slot() { + self.report_metrics(bank_slot); return Err(PohRecorderError::MaxHeightReached); } { let now = Instant::now(); let mut poh_lock = self.poh.lock().unwrap(); - inc_new_counter_info!( - "poh_recorder-record_lock_contention", - timing::duration_as_us(&now.elapsed()) as usize - ); + + self.record_lock_contention_us += timing::duration_as_us(&now.elapsed()); let now = Instant::now(); let res = poh_lock.record(mixin); - inc_new_counter_info!( - "poh_recorder-record_ms", - timing::duration_as_us(&now.elapsed()) as usize - ); + self.record_us += timing::duration_as_us(&now.elapsed()); if let Some(poh_entry) = res { let entry = Entry { num_hashes: poh_entry.num_hashes, @@ -469,6 +480,10 @@ impl PohRecorder { leader_schedule_cache: leader_schedule_cache.clone(), ticks_per_slot, poh_config: poh_config.clone(), + record_lock_contention_us: 0, + tick_lock_contention_us: 0, + record_us: 0, + tick_overhead_us: 0, }, receiver, ) diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 9dc915749a..02ff4c3f4e 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -16,8 +16,8 @@ pub struct PohService { // * The larger this number is from 1, the speed of recording transactions will suffer due to lock // contention with the PoH hashing within `tick_producer()`. // -// See benches/poh.rs for some benchmarks that attempt to justify this magic number. -pub const NUM_HASHES_PER_BATCH: u64 = 1; +// Can use test_poh_service to calibrate this +pub const DEFAULT_HASHES_PER_BATCH: u64 = 64; pub const DEFAULT_PINNED_CPU_CORE: usize = 0; @@ -30,6 +30,7 @@ impl PohService { poh_exit: &Arc, ticks_per_slot: u64, pinned_cpu_core: usize, + hashes_per_batch: u64, ) -> Self { let poh_exit_ = poh_exit.clone(); let poh_config = poh_config.clone(); @@ -66,6 +67,7 @@ impl PohService { &poh_exit_, poh_config.target_tick_duration.as_nanos() as u64 - adjustment_per_tick, ticks_per_slot, + hashes_per_batch, ); } poh_exit_.store(true, Ordering::Relaxed); @@ -107,8 +109,8 @@ impl PohService { poh_exit: &AtomicBool, target_tick_ns: u64, ticks_per_slot: u64, + hashes_per_batch: u64, ) { - info!("starting with target ns: {}", target_tick_ns); let poh = poh_recorder.lock().unwrap().poh.clone(); let mut now = Instant::now(); let mut last_metric = Instant::now(); @@ -116,8 +118,8 @@ impl PohService { let mut num_hashes = 0; let mut total_sleep_us = 0; loop { - num_hashes += NUM_HASHES_PER_BATCH; - if poh.lock().unwrap().hash(NUM_HASHES_PER_BATCH) { + num_hashes += hashes_per_batch; + if poh.lock().unwrap().hash(hashes_per_batch) { // Lock PohRecorder only for the final hash... poh_recorder.lock().unwrap().tick(); num_ticks += 1; @@ -161,17 +163,22 @@ impl PohService { mod tests { use super::*; use crate::poh_recorder::WorkingBank; + use rand::{thread_rng, Rng}; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; + use solana_measure::measure::Measure; use solana_perf::test_tx::test_tx; use solana_runtime::bank::Bank; + use solana_sdk::clock; use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; + use solana_sdk::timing; use std::time::Duration; #[test] fn test_poh_service() { + solana_logger::setup(); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); @@ -179,9 +186,13 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); + + let default_target_tick_duration = + timing::duration_as_us(&PohConfig::default().target_tick_duration); + let target_tick_duration = Duration::from_micros(default_target_tick_duration); let poh_config = Arc::new(PohConfig { - hashes_per_tick: Some(2), - target_tick_duration: Duration::from_millis(42), + hashes_per_tick: Some(clock::DEFAULT_HASHES_PER_TICK), + target_tick_duration, target_tick_count: None, }); let (poh_recorder, entry_receiver) = PohRecorder::new( @@ -202,6 +213,14 @@ mod tests { min_tick_height: bank.tick_height(), max_tick_height: std::u64::MAX, }; + let ticks_per_slot = bank.ticks_per_slot(); + + // specify RUN_TIME to run in a benchmark-like mode + // to calibrate batch size + let run_time = std::env::var("RUN_TIME") + .map(|x| x.parse().unwrap()) + .unwrap_or(0); + let is_test_run = run_time == 0; let entry_producer = { let poh_recorder = poh_recorder.clone(); @@ -210,16 +229,33 @@ mod tests { Builder::new() .name("solana-poh-service-entry_producer".to_string()) .spawn(move || { + let now = Instant::now(); + let mut total_us = 0; + let mut total_times = 0; + let h1 = hash(b"hello world!"); + let tx = test_tx(); loop { // send some data - let h1 = hash(b"hello world!"); - let tx = test_tx(); - let _ = poh_recorder - .lock() - .unwrap() - .record(bank.slot(), h1, vec![tx]); + let mut time = Measure::start("record"); + let _ = poh_recorder.lock().unwrap().record( + bank.slot(), + h1, + vec![tx.clone()], + ); + time.stop(); + total_us += time.as_us(); + total_times += 1; + if is_test_run && thread_rng().gen_ratio(1, 4) { + sleep(Duration::from_millis(200)); + } if exit.load(Ordering::Relaxed) { + info!( + "spent:{}ms record: {}ms entries recorded: {}", + now.elapsed().as_millis(), + total_us / 1000, + total_times, + ); break; } } @@ -227,12 +263,16 @@ mod tests { .unwrap() }; + let hashes_per_batch = std::env::var("HASHES_PER_BATCH") + .map(|x| x.parse().unwrap()) + .unwrap_or(DEFAULT_HASHES_PER_BATCH); let poh_service = PohService::new( poh_recorder.clone(), &poh_config, &exit, 0, DEFAULT_PINNED_CPU_CORE, + hashes_per_batch, ); poh_recorder.lock().unwrap().set_working_bank(working_bank); @@ -241,11 +281,14 @@ mod tests { let mut need_tick = true; let mut need_entry = true; let mut need_partial = true; + let mut num_ticks = 0; - while need_tick || need_entry || need_partial { + let time = Instant::now(); + while run_time != 0 || need_tick || need_entry || need_partial { let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); if entry.is_tick() { + num_ticks += 1; assert!( entry.num_hashes <= poh_config.hashes_per_tick.unwrap(), "{} <= {}", @@ -269,7 +312,35 @@ mod tests { need_entry = false; hashes += entry.num_hashes; } + + if run_time != 0 { + if time.elapsed().as_millis() > run_time { + break; + } + } else { + assert!( + time.elapsed().as_secs() < 60, + "Test should not run for this long! {}s tick {} entry {} partial {}", + time.elapsed().as_secs(), + need_tick, + need_entry, + need_partial, + ); + } } + info!( + "target_tick_duration: {} ticks_per_slot: {}", + poh_config.target_tick_duration.as_nanos(), + ticks_per_slot + ); + let elapsed = time.elapsed(); + info!( + "{} ticks in {}ms {}us/tick", + num_ticks, + elapsed.as_millis(), + elapsed.as_micros() / num_ticks + ); + exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); entry_producer.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 5778f8d500..bd7c6aa979 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -122,6 +122,7 @@ pub struct ValidatorConfig { pub send_transaction_leader_forward_count: u64, pub no_poh_speed_test: bool, pub poh_pinned_cpu_core: usize, + pub poh_hashes_per_batch: u64, pub account_indexes: HashSet, pub accounts_db_caching_enabled: bool, pub warp_slot: Option, @@ -175,6 +176,7 @@ impl Default for ValidatorConfig { send_transaction_leader_forward_count: 2, no_poh_speed_test: true, poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE, + poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH, account_indexes: HashSet::new(), accounts_db_caching_enabled: false, warp_slot: None, @@ -641,6 +643,7 @@ impl Validator { &exit, bank.ticks_per_slot(), config.poh_pinned_cpu_core, + config.poh_hashes_per_batch, ); assert_eq!( blockstore.new_shreds_signals.len(), diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 71c472d707..61bb05c991 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -52,6 +52,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { accounts_db_use_index_hash_calculation: config.accounts_db_use_index_hash_calculation, tpu_coalesce_ms: config.tpu_coalesce_ms, validator_exit: Arc::new(RwLock::new(ValidatorExit::default())), + poh_hashes_per_batch: config.poh_hashes_per_batch, } } diff --git a/validator/src/main.rs b/validator/src/main.rs index 3f26bd60e9..93931d680a 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1606,6 +1606,14 @@ pub fn main() { }) .help("EXPERIMENTAL: Specify which CPU core PoH is pinned to"), ) + .arg( + Arg::with_name("poh_hashes_per_batch") + .hidden(true) + .long("poh-hashes-per-batch") + .takes_value(true) + .value_name("NUM") + .help("Specify hashes per batch in PoH service"), + ) .arg( Arg::with_name("account_indexes") .long("account-index") @@ -1889,6 +1897,8 @@ pub fn main() { no_poh_speed_test: matches.is_present("no_poh_speed_test"), poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core") .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE), + poh_hashes_per_batch: value_of(&matches, "poh_hashes_per_batch") + .unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH), account_indexes, accounts_db_caching_enabled: !matches.is_present("no_accounts_db_caching"), accounts_db_test_hash_calculation: matches.is_present("accounts_db_test_hash_calculation"),