From d66ba825e5c85d536f545db2dd0f97b577437984 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 9 Aug 2024 12:22:52 +0200 Subject: [PATCH] track source lagg --- src/slot_latency_tester.rs | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/slot_latency_tester.rs b/src/slot_latency_tester.rs index fc04ecb..ad1649a 100644 --- a/src/slot_latency_tester.rs +++ b/src/slot_latency_tester.rs @@ -113,12 +113,14 @@ async fn main() { )); let mut latest_slot_per_source: HashMap = HashMap::new(); + let mut update_timestamp_per_source: HashMap = HashMap::new(); - while let Some(SlotDatapoint { slot, source, .. }) = slots_rx.recv().await { + while let Some(SlotDatapoint { slot, source, timestamp: update_timestamp }) = slots_rx.recv().await { // println!("Slot from {:?}: {}", source, slot); - latest_slot_per_source.insert(source, slot); + latest_slot_per_source.insert(source.clone(), slot); + update_timestamp_per_source.insert(source.clone(), update_timestamp); - visualize_slots(&latest_slot_per_source).await; + visualize_slots(&latest_slot_per_source, &update_timestamp_per_source).await; // if Instant::now().duration_since(started_at) > Duration::from_secs(10) { // break; @@ -260,9 +262,19 @@ pub fn slots() -> SubscribeRequest { } } -async fn visualize_slots(latest_slot_per_source: &HashMap) { +const STALE_SOURCE_TIMEOUT: Duration = Duration::from_millis(3000); + +async fn visualize_slots(latest_slot_per_source: &HashMap, update_timestamp_per_source: &HashMap) { // println!("Slots: {:?}", latest_slot_per_source); + let threshold = Instant::now() - STALE_SOURCE_TIMEOUT; + + let stale_sources: HashSet = update_timestamp_per_source.iter() + .filter(|(source, &updated_timestamp)| updated_timestamp < threshold) + .map(|(source, _)| source) + .cloned() + .collect(); + let map_source_by_name: HashMap = enum_iterator::all::() .map(|check| (format!("{:?}", check), check)) .collect(); @@ -283,7 +295,8 @@ async fn visualize_slots(latest_slot_per_source: &HashMap) { for i in 0..(sorted_by_time.len() + deltas.len()) { if i % 2 == 0 { let (source, slot) = sorted_by_time.get(i / 2).unwrap(); - print!("{}({:?})", slot, source); + let staleness_marker = if stale_sources.contains(source) { "!!" } else { "" }; + print!("{staleness_marker}{slot}({source:?}){staleness_marker}"); } else { let edge = *deltas.get(i / 2).unwrap(); if edge == 0 { @@ -306,6 +319,12 @@ async fn visualize_slots(latest_slot_per_source: &HashMap) { print!(" // no data from {:?}", no_data_sources); } + if stale_sources.is_empty() { + print!(", no stale sources"); + } else { + print!(", {} stale sources (threshold={:?})", stale_sources.len(), STALE_SOURCE_TIMEOUT); + } + println!(); // print!("{}[2K\r", 27 as char); }