add websocket timeout

This commit is contained in:
GroovieGermanikus 2024-08-09 11:18:52 +02:00
parent 2cfe24feb3
commit 294094116b
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 34 additions and 13 deletions

View File

@ -13,8 +13,11 @@ use std::collections::{HashMap, HashSet};
use std::env; use std::env;
use std::pin::pin; use std::pin::pin;
use std::time::Duration; use std::time::Duration;
use tokio::time::Instant; use tokio::sync::broadcast::error::RecvError;
use tokio::time::{Instant, sleep, timeout, Timeout};
use tokio::time::error::Elapsed;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::{error, warn};
use url::Url; use url::Url;
use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage}; use websocket_tungstenite_retry::websocket_stable::{StableWebSocket, WsMessage};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
@ -110,6 +113,7 @@ async fn main() {
)); ));
let mut latest_slot_per_source: HashMap<SlotSource, Slot> = HashMap::new(); let mut latest_slot_per_source: HashMap<SlotSource, Slot> = HashMap::new();
while let Some(SlotDatapoint { slot, source, .. }) = slots_rx.recv().await { while let Some(SlotDatapoint { slot, source, .. }) = slots_rx.recv().await {
// println!("Slot from {:?}: {}", source, slot); // println!("Slot from {:?}: {}", source, slot);
latest_slot_per_source.insert(source, slot); latest_slot_per_source.insert(source, slot);
@ -173,22 +177,39 @@ async fn websocket_source(
let mut channel = ws1.subscribe_message_channel(); let mut channel = ws1.subscribe_message_channel();
while let Ok(msg) = channel.recv().await { // the timeout is a workaround as we see the websocket source starving with no data
if let WsMessage::Text(payload) = msg { loop {
let ws_result: jsonrpsee_types::SubscriptionResponse<SlotInfo> = match timeout(Duration::from_millis(5000), channel.recv()).await {
serde_json::from_str(&payload).unwrap(); Ok(Ok(WsMessage::Text(payload))) => {
let slot_info = ws_result.params.result; let ws_result: jsonrpsee_types::SubscriptionResponse<SlotInfo> =
match mpsc_downstream serde_json::from_str(&payload).unwrap();
.send(SlotDatapoint::new(slot_source.clone(), slot_info.slot)) let slot_info = ws_result.params.result;
.await match mpsc_downstream
{ .send(SlotDatapoint::new(slot_source.clone(), slot_info.slot))
Ok(_) => {} .await
Err(_) => panic!("downstream error"), {
Ok(_) => {}
Err(_) => panic!("downstream error"),
}
}
Ok(Ok(WsMessage::Binary(_))) => {
panic!("Unexpected binary message from websocket source");
}
Err(_elapsed) => {
warn!("Websocket source timeout unexpectedly; continue waiting but there's little hope");
// throttle a bit
sleep(Duration::from_millis(500)).await;
}
Ok(Err(RecvError::Lagged(_))) => {
warn!("Websocket broadcast channel lagged - continue");
}
Ok(Err(RecvError::Closed)) => {
panic!("Websocket broadcast channel closed - should never happen");
} }
} }
} }
panic!("Websocket source ended unexpectedly"); // unreachable code
} }
// note: this might fail if the yellowstone plugin does not allow "any broadcast filter" // note: this might fail if the yellowstone plugin does not allow "any broadcast filter"