CHecking stream for timeouts. (#18)
This commit is contained in:
parent
21ab222099
commit
d7969a8087
25
src/main.rs
25
src/main.rs
|
@ -5,6 +5,7 @@ use std::{
|
|||
sync::{atomic::AtomicU64, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use yellowstone_grpc_proto_original::geyser::SubscribeRequestPing;
|
||||
|
||||
use crate::prometheus_sync::PrometheusSync;
|
||||
use block_info::BlockInfo;
|
||||
|
@ -135,7 +136,6 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
}
|
||||
},
|
||||
_=>{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -189,11 +189,26 @@ async fn start_tracking_blocks(
|
|||
Default::default(),
|
||||
None,
|
||||
Default::default(),
|
||||
None,
|
||||
Some(SubscribeRequestPing { id: 0 }),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
while let Some(message) = geyser_stream.next().await {
|
||||
loop {
|
||||
let res = tokio::time::timeout(Duration::from_secs(10), geyser_stream.next()).await;
|
||||
let message_res = if let Ok(message) = res {
|
||||
message
|
||||
} else {
|
||||
// restarting geyser block subscription because of timeout
|
||||
error!("Restarting geyser block subscription because of timeout");
|
||||
break;
|
||||
};
|
||||
let message = if let Some(message) = message_res {
|
||||
message
|
||||
} else {
|
||||
error!("Restarting geyser block subscription because it is broken");
|
||||
break;
|
||||
};
|
||||
|
||||
let Ok(message) = message else {
|
||||
continue;
|
||||
};
|
||||
|
@ -246,9 +261,7 @@ async fn start_tracking_blocks(
|
|||
});
|
||||
// delay queue so that we get all the banking stage errors before processing block
|
||||
}
|
||||
_ => {
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
}
|
||||
log::error!("stopping the sidecar, geyser block stream is broken");
|
||||
|
|
Loading…
Reference in New Issue