diff --git a/src/slot_latency_tester.rs b/src/slot_latency_tester.rs index 82fb330..fc04ecb 100644 --- a/src/slot_latency_tester.rs +++ b/src/slot_latency_tester.rs @@ -13,8 +13,11 @@ use std::collections::{HashMap, HashSet}; use std::env; use std::pin::pin; use std::time::Duration; -use tokio::time::Instant; +use tokio::sync::broadcast::error::RecvError; +use tokio::time::{Instant, sleep, timeout, Timeout}; +use tokio::time::error::Elapsed; use tokio_stream::StreamExt; +use tracing::{error, warn}; use url::Url; use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; @@ -110,6 +113,7 @@ async fn main() { )); let mut latest_slot_per_source: HashMap = HashMap::new(); + while let Some(SlotDatapoint { slot, source, .. }) = slots_rx.recv().await { // println!("Slot from {:?}: {}", source, slot); latest_slot_per_source.insert(source, slot); @@ -173,22 +177,39 @@ async fn websocket_source( let mut channel = ws1.subscribe_message_channel(); - while let Ok(msg) = channel.recv().await { - if let WsMessage::Text(payload) = msg { - let ws_result: jsonrpsee_types::SubscriptionResponse = - serde_json::from_str(&payload).unwrap(); - let slot_info = ws_result.params.result; - match mpsc_downstream - .send(SlotDatapoint::new(slot_source.clone(), slot_info.slot)) - .await - { - Ok(_) => {} - Err(_) => panic!("downstream error"), + // the timeout is a workaround as we see the websocket source starving with no data + loop { + match timeout(Duration::from_millis(5000), channel.recv()).await { + Ok(Ok(WsMessage::Text(payload))) => { + let ws_result: jsonrpsee_types::SubscriptionResponse = + serde_json::from_str(&payload).unwrap(); + let slot_info = ws_result.params.result; + match mpsc_downstream + .send(SlotDatapoint::new(slot_source.clone(), slot_info.slot)) + .await + { + Ok(_) => {} + Err(_) => panic!("downstream error"), + } + } + Ok(Ok(WsMessage::Binary(_))) => { + panic!("Unexpected binary message from websocket source"); + } + Err(_elapsed) => { + warn!("Websocket source timeout unexpectedly; continue waiting but there's little hope"); + // throttle a bit + sleep(Duration::from_millis(500)).await; + } + Ok(Err(RecvError::Lagged(_))) => { + warn!("Websocket broadcast channel lagged - continue"); + } + Ok(Err(RecvError::Closed)) => { + panic!("Websocket broadcast channel closed - should never happen"); } } } - panic!("Websocket source ended unexpectedly"); + // unreachable code } // note: this might fail if the yellowstone plugin does not allow "any broadcast filter"