From ba770832d0abf691db7f8fde248466d5eee49bfa Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Wed, 30 Mar 2022 09:04:49 -0500 Subject: [PATCH] Poh timing service (#23736) * initial work for poh timing report service * add poh_timing_report_service to validator * fix comments * clippy * imrove test coverage * delete record when complete * rename shred full to slot full. * debug logging * fix slot full * remove debug comments * adding fmt trait * derive default * default for poh timing reporter * better comments * remove commented code * fix test * more test fixes * delete timestamps for slot that are older than root_slot * debug log * record poh start end in bank reset * report full to start time instead * fix poh slot offset * report poh start for normal ticks * fix typo * refactor out poh point report fn * rename * optimize delete - delete only when last_root changed * change log level to trace * convert if to match * remove redudant check * fix SlotPohTiming comments * review feedback on poh timing reporter * review feedback on poh_recorder * add test case for out-of-order arrival of timing points and incomplete timing points * refactor poh_timing_points into its own mod * remove option for poh_timing_report service * move poh_timing_point_sender to constructor * clippy * better comments * more clippy * more clippy * add slot poh timing point macro * clippy * assert in test * comments and display fmt * fix check * assert format * revise comments * refactor * extrac send fn * revert reporting_poh_timing_point * align loggin * small refactor * move type declaration to the top of the module * replace macro with constructor * clippy: remove redundant closure * review comments * simplify poh timing point creation Co-authored-by: Haoran Yi --- core/src/lib.rs | 2 + core/src/poh_timing_report_service.rs | 87 ++++++++++ core/src/poh_timing_reporter.rs | 239 ++++++++++++++++++++++++++ core/src/validator.rs | 18 +- ledger/src/blockstore.rs | 28 ++- metrics/src/lib.rs | 1 + metrics/src/poh_timing_point.rs | 173 +++++++++++++++++++ poh/src/poh_recorder.rs | 104 +++++++++++ 8 files changed, 649 insertions(+), 3 deletions(-) create mode 100644 core/src/poh_timing_report_service.rs create mode 100644 core/src/poh_timing_reporter.rs create mode 100644 metrics/src/poh_timing_point.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index e4032def4..abc843bbd 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -37,6 +37,8 @@ pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; pub mod packet_hasher; pub mod packet_threshold; +pub mod poh_timing_report_service; +pub mod poh_timing_reporter; pub mod progress_map; pub mod qos_service; pub mod repair_generic_traversal; diff --git a/core/src/poh_timing_report_service.rs b/core/src/poh_timing_report_service.rs new file mode 100644 index 000000000..f2e43da07 --- /dev/null +++ b/core/src/poh_timing_report_service.rs @@ -0,0 +1,87 @@ +//! PohTimingReportService module +use { + crate::poh_timing_reporter::PohTimingReporter, + solana_metrics::poh_timing_point::{PohTimingReceiver, SlotPohTimingInfo}, + std::{ + string::ToString, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +/// Timeout to wait on the poh timing points from the channel +const POH_TIMING_RECEIVER_TIMEOUT_MILLISECONDS: u64 = 1000; + +/// The `poh_timing_report_service` receives signals of relevant timing points +/// during the processing of a slot, (i.e. from blockstore and poh), aggregate and +/// report the result as datapoints. +pub struct PohTimingReportService { + t_poh_timing: JoinHandle<()>, +} + +impl PohTimingReportService { + pub fn new(receiver: PohTimingReceiver, exit: Arc) -> Self { + let exit_signal = exit; + let mut poh_timing_reporter = PohTimingReporter::default(); + let t_poh_timing = Builder::new() + .name("poh_timing_report".to_string()) + .spawn(move || loop { + if exit_signal.load(Ordering::Relaxed) { + break; + } + if let Ok(SlotPohTimingInfo { + slot, + root_slot, + timing_point, + }) = receiver.recv_timeout(Duration::from_millis( + POH_TIMING_RECEIVER_TIMEOUT_MILLISECONDS, + )) { + poh_timing_reporter.process(slot, root_slot, timing_point); + } + }) + .unwrap(); + Self { t_poh_timing } + } + + pub fn join(self) -> thread::Result<()> { + self.t_poh_timing.join() + } +} + +#[cfg(test)] +mod test { + use { + super::*, crossbeam_channel::unbounded, solana_metrics::poh_timing_point::SlotPohTimingInfo, + }; + + #[test] + /// Test the life cycle of the PohTimingReportService + fn test_poh_timing_report_service() { + let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded(); + let exit = Arc::new(AtomicBool::new(false)); + // Create the service + let poh_timing_report_service = + PohTimingReportService::new(poh_timing_point_receiver, exit.clone()); + + // Send SlotPohTimingPoint + let _ = poh_timing_point_sender.send(SlotPohTimingInfo::new_slot_start_poh_time_point( + 42, None, 100, + )); + let _ = poh_timing_point_sender.send(SlotPohTimingInfo::new_slot_end_poh_time_point( + 42, None, 200, + )); + let _ = poh_timing_point_sender.send(SlotPohTimingInfo::new_slot_full_poh_time_point( + 42, None, 150, + )); + + // Shutdown the service + exit.store(true, Ordering::Relaxed); + poh_timing_report_service + .join() + .expect("poh_timing_report_service completed"); + } +} diff --git a/core/src/poh_timing_reporter.rs b/core/src/poh_timing_reporter.rs new file mode 100644 index 000000000..76f977386 --- /dev/null +++ b/core/src/poh_timing_reporter.rs @@ -0,0 +1,239 @@ +//! A poh_timing_reporter module implement poh timing point and timing reporter +//! structs. +use { + solana_metrics::{datapoint_info, poh_timing_point::PohTimingPoint}, + solana_sdk::clock::Slot, + std::{collections::HashMap, fmt}, +}; + +/// A SlotPohTimestamp records timing of the events during the processing of a +/// slot by the validator +#[derive(Debug, Clone, Copy, Default)] +pub struct SlotPohTimestamp { + /// Slot start time from poh + pub start_time: u64, + /// Slot end time from poh + pub end_time: u64, + /// Last shred received time from block producer + pub full_time: u64, +} + +/// Display trait +impl fmt::Display for SlotPohTimestamp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "SlotPohTimestamp: start={} end={} full={}", + self.start_time, self.end_time, self.full_time + ) + } +} + +impl SlotPohTimestamp { + /// Return true if the timing points of all events are received. + pub fn is_complete(&self) -> bool { + self.start_time != 0 && self.end_time != 0 && self.full_time != 0 + } + + /// Update with timing point + pub fn update(&mut self, timing_point: PohTimingPoint) { + match timing_point { + PohTimingPoint::PohSlotStart(ts) => self.start_time = ts, + PohTimingPoint::PohSlotEnd(ts) => self.end_time = ts, + PohTimingPoint::FullSlotReceived(ts) => self.full_time = ts, + } + } + + /// Return the time difference from slot start to slot full + fn slot_start_to_full_time(&self) -> i64 { + (self.full_time as i64).saturating_sub(self.start_time as i64) + } + + /// Return the time difference from slot full to slot end + fn slot_full_to_end_time(&self) -> i64 { + (self.end_time as i64).saturating_sub(self.full_time as i64) + } + + /// Report PohTiming for a slot + pub fn report(&self, slot: Slot) { + datapoint_info!( + "poh_slot_timing", + ("slot", slot as i64, i64), + ("start_time", self.start_time as i64, i64), + ("end_time", self.end_time as i64, i64), + ("full_time", self.full_time as i64, i64), + ( + "start_to_full_time_diff", + self.slot_start_to_full_time(), + i64 + ), + ("full_to_end_time_diff", self.slot_full_to_end_time(), i64), + ); + } +} + +/// A PohTimingReporter manages and reports the timing of events for incoming +/// slots +#[derive(Default)] +pub struct PohTimingReporter { + /// Storage map of SlotPohTimestamp per slot + slot_timestamps: HashMap, + last_root_slot: Slot, +} + +impl PohTimingReporter { + /// Return true if PohTiming is complete for the slot + pub fn is_complete(&self, slot: Slot) -> bool { + if let Some(slot_timestamp) = self.slot_timestamps.get(&slot) { + slot_timestamp.is_complete() + } else { + false + } + } + + /// Process incoming PohTimingPoint from the channel + pub fn process(&mut self, slot: Slot, root_slot: Option, t: PohTimingPoint) -> bool { + let slot_timestamp = self + .slot_timestamps + .entry(slot) + .or_insert_with(SlotPohTimestamp::default); + + slot_timestamp.update(t); + let is_completed = slot_timestamp.is_complete(); + if is_completed { + slot_timestamp.report(slot); + } + + // delete slots that are older than the root_slot + if let Some(root_slot) = root_slot { + if root_slot > self.last_root_slot { + self.slot_timestamps.retain(|&k, _| k >= root_slot); + self.last_root_slot = root_slot; + } + } + is_completed + } + + /// Return the count of slot_timestamps in tracking + pub fn slot_count(&self) -> usize { + self.slot_timestamps.len() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + /// Test poh_timing_reporter + fn test_poh_timing_reporter() { + // create a reporter + let mut reporter = PohTimingReporter::default(); + + // process all relevant PohTimingPoints for slot 42 + let complete = reporter.process(42, None, PohTimingPoint::PohSlotStart(100)); + assert!(!complete); + let complete = reporter.process(42, None, PohTimingPoint::PohSlotEnd(200)); + assert!(!complete); + let complete = reporter.process(42, None, PohTimingPoint::FullSlotReceived(150)); + // assert that the PohTiming is complete + assert!(complete); + + // Move root to slot 43 + let root = Some(43); + + // process all relevant PohTimingPoints for slot 45 + let complete = reporter.process(45, None, PohTimingPoint::PohSlotStart(100)); + assert!(!complete); + let complete = reporter.process(45, None, PohTimingPoint::PohSlotEnd(200)); + assert!(!complete); + let complete = reporter.process(45, root, PohTimingPoint::FullSlotReceived(150)); + // assert that the PohTiming is complete + assert!(complete); + + // assert that only one timestamp remains in track + assert_eq!(reporter.slot_count(), 1) + } + + #[test] + /// Test poh_timing_reporter + fn test_poh_timing_reporter_out_of_order() { + // create a reporter + let mut reporter = PohTimingReporter::default(); + + // process all relevant PohTimingPoints for slot 42/43 out of order + let mut c = 0; + // slot_start 42 + c += reporter.process(42, None, PohTimingPoint::PohSlotStart(100)) as i32; + // slot_full 42 + c += reporter.process(42, None, PohTimingPoint::FullSlotReceived(120)) as i32; + // slot_full 43 + c += reporter.process(43, None, PohTimingPoint::FullSlotReceived(140)) as i32; + // slot_end 42 + c += reporter.process(42, None, PohTimingPoint::PohSlotEnd(200)) as i32; + // slot start 43 + c += reporter.process(43, None, PohTimingPoint::PohSlotStart(100)) as i32; + // slot end 43 + c += reporter.process(43, None, PohTimingPoint::PohSlotEnd(200)) as i32; + + // assert that both timing points are complete + assert_eq!(c, 2); + + // assert that both timestamps remain in track + assert_eq!(reporter.slot_count(), 2) + } + + #[test] + /// Test poh_timing_reporter + fn test_poh_timing_reporter_never_complete() { + // create a reporter + let mut reporter = PohTimingReporter::default(); + + let mut c = 0; + + // process all relevant PohTimingPoints for slot 42/43 out of order + // slot_start 42 + c += reporter.process(42, None, PohTimingPoint::PohSlotStart(100)) as i32; + + // slot_full 42 + c += reporter.process(42, None, PohTimingPoint::FullSlotReceived(120)) as i32; + + // slot_full 43 + c += reporter.process(43, None, PohTimingPoint::FullSlotReceived(140)) as i32; + + // skip slot 42, jump to slot 43 + // slot start 43 + c += reporter.process(43, None, PohTimingPoint::PohSlotStart(100)) as i32; + + // slot end 43 + c += reporter.process(43, None, PohTimingPoint::PohSlotEnd(200)) as i32; + + // assert that only one timing point is complete + assert_eq!(c, 1); + + // assert that both timestamp is in track + assert_eq!(reporter.slot_count(), 2) + } + + #[test] + fn test_poh_timing_reporter_overflow() { + // create a reporter + let mut reporter = PohTimingReporter::default(); + + // process all relevant PohTimingPoints for a slot + let complete = reporter.process(42, None, PohTimingPoint::PohSlotStart(1647624609896)); + assert!(!complete); + let complete = reporter.process(42, None, PohTimingPoint::PohSlotEnd(1647624610286)); + assert!(!complete); + let complete = reporter.process(42, None, PohTimingPoint::FullSlotReceived(1647624610281)); + + // assert that the PohTiming is complete + assert!(complete); + } + + #[test] + fn test_slot_poh_timestamp_fmt() { + let t = SlotPohTimestamp::default(); + assert_eq!(format!("{}", t), "SlotPohTimestamp: start=0 end=0 full=0"); + } +} diff --git a/core/src/validator.rs b/core/src/validator.rs index bb1fbab9a..29f50b9ab 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -8,6 +8,7 @@ use { cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, consensus::{reconcile_blockstore_roots_with_tower, Tower}, + poh_timing_report_service::PohTimingReportService, rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService}, sample_performance_service::SamplePerformanceService, serve_repair::ServeRepair, @@ -44,7 +45,7 @@ use { leader_schedule_cache::LeaderScheduleCache, }, solana_measure::measure::Measure, - solana_metrics::datapoint_info, + solana_metrics::{datapoint_info, poh_timing_point::PohTimingSender}, solana_poh::{ poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_service::{self, PohService}, @@ -323,6 +324,7 @@ pub struct Validator { cache_block_meta_service: Option, system_monitor_service: Option, sample_performance_service: Option, + poh_timing_report_service: PohTimingReportService, stats_reporter_service: StatsReporterService, gossip_service: GossipService, serve_repair_service: ServeRepairService, @@ -482,6 +484,10 @@ impl Validator { !config.no_os_network_stats_reporting, )); + let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded(); + let poh_timing_report_service = + PohTimingReportService::new(poh_timing_point_receiver, exit.clone()); + let ( genesis_config, mut bank_forks, @@ -508,6 +514,7 @@ impl Validator { &start_progress, accounts_update_notifier, transaction_notifier, + Some(poh_timing_point_sender.clone()), ); let last_full_snapshot_slot = process_blockstore( @@ -525,7 +532,6 @@ impl Validator { last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0)); maybe_warp_slot(config, ledger_path, &mut bank_forks, &leader_schedule_cache); - let tower = { let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id); if let Ok(tower) = &restored_tower { @@ -653,6 +659,7 @@ impl Validator { blockstore.new_shreds_signals.first().cloned(), &leader_schedule_cache, &poh_config, + Some(poh_timing_point_sender), exit.clone(), ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); @@ -973,6 +980,7 @@ impl Validator { cache_block_meta_service, system_monitor_service, sample_performance_service, + poh_timing_report_service, snapshot_packager_service, completed_data_sets_service, tpu, @@ -1109,6 +1117,10 @@ impl Validator { if let Some(geyser_plugin_service) = self.geyser_plugin_service { geyser_plugin_service.join().expect("geyser_plugin_service"); } + + self.poh_timing_report_service + .join() + .expect("poh_timing_report_service"); } } @@ -1247,6 +1259,7 @@ fn load_blockstore( start_progress: &Arc>, accounts_update_notifier: Option, transaction_notifier: Option, + poh_timing_point_sender: Option, ) -> ( GenesisConfig, BankForks, @@ -1301,6 +1314,7 @@ fn load_blockstore( ) .expect("Failed to open ledger database"); blockstore.set_no_compaction(config.no_rocksdb_compaction); + blockstore.shred_timing_point_sender = poh_timing_point_sender; let blockstore = Arc::new(blockstore); let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index aefe623b7..30ad490bb 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -27,7 +27,10 @@ use { rocksdb::DBRawIterator, solana_entry::entry::{create_ticks, Entry}, solana_measure::measure::Measure, - solana_metrics::{datapoint_debug, datapoint_error}, + solana_metrics::{ + datapoint_debug, datapoint_error, + poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, + }, solana_rayon_threadlimit::get_thread_count, solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, solana_sdk::{ @@ -173,6 +176,7 @@ pub struct Blockstore { insert_shreds_lock: Mutex<()>, pub new_shreds_signals: Vec>, pub completed_slots_senders: Vec, + pub shred_timing_point_sender: Option, pub lowest_cleanup_slot: RwLock, no_compaction: bool, slots_stats: Mutex, @@ -442,6 +446,7 @@ impl Blockstore { bank_hash_cf, new_shreds_signals: vec![], completed_slots_senders: vec![], + shred_timing_point_sender: None, insert_shreds_lock: Mutex::<()>::default(), last_root, lowest_cleanup_slot: RwLock::::default(), @@ -1548,6 +1553,20 @@ impl Blockstore { .unwrap_or_default() } + /// send slot full timing point to poh_timing_report service + fn send_slot_full_timing(&self, slot: Slot) { + if let Some(ref sender) = self.shred_timing_point_sender { + send_poh_timing_point( + sender, + SlotPohTimingInfo::new_slot_full_poh_time_point( + slot, + Some(self.last_root()), + solana_sdk::timing::timestamp(), + ), + ); + } + } + fn insert_data_shred( &self, slot_meta: &mut SlotMeta, @@ -1619,7 +1638,14 @@ impl Blockstore { slots_stats.set_full(slot_meta); } } + + // slot is full, send slot full timing to poh_timing_report service. + if slot_meta.is_full() { + self.send_slot_full_timing(slot); + } + trace!("inserted shred into slot {:?} and index {:?}", slot, index); + Ok(newly_completed_data_sets) } diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 7f5121317..80e999c75 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -2,6 +2,7 @@ pub mod counter; pub mod datapoint; mod metrics; +pub mod poh_timing_point; pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit}; use std::sync::Arc; diff --git a/metrics/src/poh_timing_point.rs b/metrics/src/poh_timing_point.rs new file mode 100644 index 000000000..d48fddc79 --- /dev/null +++ b/metrics/src/poh_timing_point.rs @@ -0,0 +1,173 @@ +//! A poh_timing_point module + +use { + crossbeam_channel::{Receiver, Sender}, + log::*, + solana_sdk::clock::Slot, + std::fmt, +}; + +/// Receiver of SlotPohTimingInfo from the channel +pub type PohTimingReceiver = Receiver; + +/// Sender of SlotPohTimingInfo to the channel +pub type PohTimingSender = Sender; + +/// PohTimingPoint. Each TimingPoint is annotated with a timestamp in milliseconds. +#[derive(Debug, Clone, PartialEq)] +pub enum PohTimingPoint { + PohSlotStart(u64), + PohSlotEnd(u64), + FullSlotReceived(u64), +} + +impl fmt::Display for PohTimingPoint { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + PohTimingPoint::PohSlotStart(t) => write!(f, "poh_start={}", t), + PohTimingPoint::PohSlotEnd(t) => write!(f, "poh_end ={}", t), + PohTimingPoint::FullSlotReceived(t) => write!(f, "poh_full ={}", t), + } + } +} + +/// SlotPohTimingInfo. This struct is sent to channel and received by +/// poh_timing_report service. +#[derive(Clone, Debug)] +pub struct SlotPohTimingInfo { + /// current slot + pub slot: Slot, + /// root slot + pub root_slot: Option, + /// timing event + pub timing_point: PohTimingPoint, +} + +impl fmt::Display for SlotPohTimingInfo { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "PohTimingPoint: {}, slot={}, root_slot={}", + self.timing_point, + self.slot, + self.root_slot.unwrap_or(0), + ) + } +} + +impl SlotPohTimingInfo { + /// create slot start poh timing point + pub fn new_slot_start_poh_time_point( + slot: Slot, + root_slot: Option, + timestamp: u64, + ) -> SlotPohTimingInfo { + SlotPohTimingInfo { + slot, + root_slot, + timing_point: PohTimingPoint::PohSlotStart(timestamp), + } + } + + /// create slot end poh timing point + pub fn new_slot_end_poh_time_point( + slot: Slot, + root_slot: Option, + timestamp: u64, + ) -> SlotPohTimingInfo { + SlotPohTimingInfo { + slot, + root_slot, + timing_point: PohTimingPoint::PohSlotEnd(timestamp), + } + } + + /// create slot full poh timing point + pub fn new_slot_full_poh_time_point( + slot: Slot, + root_slot: Option, + timestamp: u64, + ) -> SlotPohTimingInfo { + SlotPohTimingInfo { + slot, + root_slot, + timing_point: PohTimingPoint::FullSlotReceived(timestamp), + } + } +} + +/// send poh timing to channel +pub fn send_poh_timing_point(sender: &PohTimingSender, slot_timing: SlotPohTimingInfo) { + trace!("{}", slot_timing); + if let Err(e) = sender.try_send(slot_timing) { + info!("failed to send slot poh timing {:?}", e); + } +} + +#[cfg(test)] +mod test { + use super::*; + #[test] + fn test_poh_timing_point() { + // create slot start with root + let p = SlotPohTimingInfo::new_slot_start_poh_time_point(100, Some(101), 100); + assert!(p.slot == 100); + assert_eq!(p.root_slot, Some(101)); + assert_eq!(p.timing_point, PohTimingPoint::PohSlotStart(100)); + assert_eq!( + format!("{}", p), + "PohTimingPoint: poh_start=100, slot=100, root_slot=101" + ); + + // create slot start without root + let p = SlotPohTimingInfo::new_slot_start_poh_time_point(100, None, 100); + assert!(p.slot == 100); + assert_eq!(p.root_slot, None); + assert_eq!(p.timing_point, PohTimingPoint::PohSlotStart(100)); + assert_eq!( + format!("{}", p), + "PohTimingPoint: poh_start=100, slot=100, root_slot=0" + ); + + // create slot end with root + let p = SlotPohTimingInfo::new_slot_end_poh_time_point(100, Some(101), 100); + assert!(p.slot == 100); + assert_eq!(p.root_slot, Some(101)); + assert_eq!(p.timing_point, PohTimingPoint::PohSlotEnd(100)); + assert_eq!( + format!("{}", p), + "PohTimingPoint: poh_end =100, slot=100, root_slot=101" + ); + + // create slot end without root + let p = SlotPohTimingInfo::new_slot_end_poh_time_point(100, None, 100); + assert!(p.slot == 100); + assert_eq!(p.root_slot, None); + assert_eq!(p.timing_point, PohTimingPoint::PohSlotEnd(100)); + assert_eq!( + format!("{}", p), + "PohTimingPoint: poh_end =100, slot=100, root_slot=0" + ); + + // create slot full with root + let p = SlotPohTimingInfo::new_slot_full_poh_time_point(100, Some(101), 100); + assert!(p.slot == 100); + assert_eq!(p.root_slot, Some(101)); + assert_eq!(p.timing_point, PohTimingPoint::FullSlotReceived(100)); + assert_eq!( + format!("{}", p), + "PohTimingPoint: poh_full =100, slot=100, root_slot=101" + ); + + // create slot full without root + let p = SlotPohTimingInfo::new_slot_full_poh_time_point(100, None, 100); + assert!(p.slot == 100); + assert_eq!(p.root_slot, None); + assert_eq!(p.timing_point, PohTimingPoint::FullSlotReceived(100)); + + assert_eq!( + format!("{}", p), + "PohTimingPoint: poh_full =100, slot=100, root_slot=0" + ); + } +} diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 480d377d2..ab0e7ff21 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -22,6 +22,7 @@ use { leader_schedule_cache::LeaderScheduleCache, }, solana_measure::measure::Measure, + solana_metrics::poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, solana_runtime::bank::Bank, solana_sdk::{ clock::NUM_CONSECUTIVE_LEADER_SLOTS, hash::Hash, poh_config::PohConfig, pubkey::Pubkey, @@ -207,6 +208,7 @@ pub struct PohRecorder { tick_cache: Vec<(Entry, u64)>, // cache of entry and its tick_height working_bank: Option, sender: Sender, + poh_timing_point_sender: Option, leader_first_tick_height_including_grace_ticks: Option, leader_last_tick_height: u64, // zero if none grace_ticks: u64, @@ -306,6 +308,20 @@ impl PohRecorder { }) } + pub fn working_bank_end_slot(&self) -> Option { + self.working_bank.as_ref().and_then(|w| { + if w.max_tick_height == self.tick_height { + Some(w.bank.slot()) + } else { + None + } + }) + } + + pub fn working_slot(&self) -> Option { + self.working_bank.as_ref().map(|w| w.bank.slot()) + } + pub fn has_bank(&self) -> bool { self.working_bank.is_some() } @@ -451,6 +467,18 @@ impl PohRecorder { self.tick_height = (self.start_slot() + 1) * self.ticks_per_slot; self.start_tick_height = self.tick_height + 1; + if let Some(ref sender) = self.poh_timing_point_sender { + // start_slot() is the parent slot. current slot is start_slot() + 1. + send_poh_timing_point( + sender, + SlotPohTimingInfo::new_slot_start_poh_time_point( + self.start_slot() + 1, + None, + solana_sdk::timing::timestamp(), + ), + ); + } + let (leader_first_tick_height_including_grace_ticks, leader_last_tick_height, grace_ticks) = Self::compute_leader_slot_tick_heights(next_leader_slot, self.ticks_per_slot); self.grace_ticks = grace_ticks; @@ -469,6 +497,21 @@ impl PohRecorder { trace!("new working bank"); assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot()); self.working_bank = Some(working_bank); + + // send poh slot start timing point + if let Some(ref sender) = self.poh_timing_point_sender { + if let Some(slot) = self.working_slot() { + send_poh_timing_point( + sender, + SlotPohTimingInfo::new_slot_start_poh_time_point( + slot, + None, + solana_sdk::timing::timestamp(), + ), + ); + } + } + // TODO: adjust the working_bank.start time based on number of ticks // that have already elapsed based on current tick height. let _ = self.flush_cache(false); @@ -539,6 +582,62 @@ impl PohRecorder { Ok(()) } + fn report_poh_timing_point_by_tick(&self) { + match self.tick_height % self.ticks_per_slot { + // reaching the end of the slot + 0 => { + if let Some(ref sender) = self.poh_timing_point_sender { + send_poh_timing_point( + sender, + SlotPohTimingInfo::new_slot_end_poh_time_point( + self.slot_for_tick_height(self.tick_height), + None, + solana_sdk::timing::timestamp(), + ), + ); + } + } + // beginning of a slot + 1 => { + if let Some(ref sender) = self.poh_timing_point_sender { + send_poh_timing_point( + sender, + SlotPohTimingInfo::new_slot_start_poh_time_point( + self.slot_for_tick_height(self.tick_height), + None, + solana_sdk::timing::timestamp(), + ), + ); + } + } + _ => {} + } + } + + fn report_poh_timing_point_by_working_bank(&self, slot: Slot) { + if let Some(ref sender) = self.poh_timing_point_sender { + send_poh_timing_point( + sender, + SlotPohTimingInfo::new_slot_end_poh_time_point( + slot, + None, + solana_sdk::timing::timestamp(), + ), + ); + } + } + + fn report_poh_timing_point(&self) { + // send poh slot end timing point + if let Some(slot) = self.working_bank_end_slot() { + // bank producer + self.report_poh_timing_point_by_working_bank(slot) + } else { + // validator + self.report_poh_timing_point_by_tick() + } + } + pub fn tick(&mut self) { let ((poh_entry, target_time), tick_lock_contention_time) = Measure::this( |_| { @@ -559,6 +658,7 @@ impl PohRecorder { if let Some(poh_entry) = poh_entry { self.tick_height += 1; trace!("tick_height {}", self.tick_height); + self.report_poh_timing_point(); if self .leader_first_tick_height_including_grace_ticks @@ -706,6 +806,7 @@ impl PohRecorder { clear_bank_signal: Option>, leader_schedule_cache: &Arc, poh_config: &Arc, + poh_timing_point_sender: Option, is_exited: Arc, ) -> (Self, Receiver, Receiver) { let tick_number = 0; @@ -731,6 +832,7 @@ impl PohRecorder { tick_cache: vec![], working_bank: None, sender, + poh_timing_point_sender, clear_bank_signal, start_bank, start_tick_height: tick_height + 1, @@ -788,6 +890,7 @@ impl PohRecorder { None, leader_schedule_cache, poh_config, + None, is_exited, ) } @@ -1488,6 +1591,7 @@ mod tests { Some(sender), &Arc::new(LeaderScheduleCache::default()), &Arc::new(PohConfig::default()), + None, Arc::new(AtomicBool::default()), ); poh_recorder.set_bank(&bank);