track source lagg

This commit is contained in:
GroovieGermanikus 2024-08-09 12:22:52 +02:00
parent 294094116b
commit d66ba825e5
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 24 additions and 5 deletions

View File

@ -113,12 +113,14 @@ async fn main() {
)); ));
let mut latest_slot_per_source: HashMap<SlotSource, Slot> = HashMap::new(); let mut latest_slot_per_source: HashMap<SlotSource, Slot> = HashMap::new();
let mut update_timestamp_per_source: HashMap<SlotSource, Instant> = HashMap::new();
while let Some(SlotDatapoint { slot, source, .. }) = slots_rx.recv().await { while let Some(SlotDatapoint { slot, source, timestamp: update_timestamp }) = slots_rx.recv().await {
// println!("Slot from {:?}: {}", source, slot); // println!("Slot from {:?}: {}", source, slot);
latest_slot_per_source.insert(source, slot); latest_slot_per_source.insert(source.clone(), slot);
update_timestamp_per_source.insert(source.clone(), update_timestamp);
visualize_slots(&latest_slot_per_source).await; visualize_slots(&latest_slot_per_source, &update_timestamp_per_source).await;
// if Instant::now().duration_since(started_at) > Duration::from_secs(10) { // if Instant::now().duration_since(started_at) > Duration::from_secs(10) {
// break; // break;
@ -260,9 +262,19 @@ pub fn slots() -> SubscribeRequest {
} }
} }
async fn visualize_slots(latest_slot_per_source: &HashMap<SlotSource, Slot>) { const STALE_SOURCE_TIMEOUT: Duration = Duration::from_millis(3000);
async fn visualize_slots(latest_slot_per_source: &HashMap<SlotSource, Slot>, update_timestamp_per_source: &HashMap<SlotSource, Instant>) {
// println!("Slots: {:?}", latest_slot_per_source); // println!("Slots: {:?}", latest_slot_per_source);
let threshold = Instant::now() - STALE_SOURCE_TIMEOUT;
let stale_sources: HashSet<SlotSource> = update_timestamp_per_source.iter()
.filter(|(source, &updated_timestamp)| updated_timestamp < threshold)
.map(|(source, _)| source)
.cloned()
.collect();
let map_source_by_name: HashMap<String, SlotSource> = enum_iterator::all::<SlotSource>() let map_source_by_name: HashMap<String, SlotSource> = enum_iterator::all::<SlotSource>()
.map(|check| (format!("{:?}", check), check)) .map(|check| (format!("{:?}", check), check))
.collect(); .collect();
@ -283,7 +295,8 @@ async fn visualize_slots(latest_slot_per_source: &HashMap<SlotSource, Slot>) {
for i in 0..(sorted_by_time.len() + deltas.len()) { for i in 0..(sorted_by_time.len() + deltas.len()) {
if i % 2 == 0 { if i % 2 == 0 {
let (source, slot) = sorted_by_time.get(i / 2).unwrap(); let (source, slot) = sorted_by_time.get(i / 2).unwrap();
print!("{}({:?})", slot, source); let staleness_marker = if stale_sources.contains(source) { "!!" } else { "" };
print!("{staleness_marker}{slot}({source:?}){staleness_marker}");
} else { } else {
let edge = *deltas.get(i / 2).unwrap(); let edge = *deltas.get(i / 2).unwrap();
if edge == 0 { if edge == 0 {
@ -306,6 +319,12 @@ async fn visualize_slots(latest_slot_per_source: &HashMap<SlotSource, Slot>) {
print!(" // no data from {:?}", no_data_sources); print!(" // no data from {:?}", no_data_sources);
} }
if stale_sources.is_empty() {
print!(", no stale sources");
} else {
print!(", {} stale sources (threshold={:?})", stale_sources.len(), STALE_SOURCE_TIMEOUT);
}
println!(); println!();
// print!("{}[2K\r", 27 as char); // print!("{}[2K\r", 27 as char);
} }