replace Option<SubscribeUpdate>

This commit is contained in:
GroovieGermanikus 2023-12-15 12:12:06 +01:00
parent dfed859d0c
commit c630c9ed56
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 22 additions and 15 deletions

View File

@ -1,3 +1,4 @@
use crate::grpc_subscription_autoreconnect::Message::Reconnecting;
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{debug, info, log, trace, warn, Level};
@ -50,6 +51,11 @@ impl GrpcSourceConfig {
}
}
pub enum Message {
GeyserSubscribeUpdate(SubscribeUpdate),
Reconnecting,
}
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected,
Connecting(JoinHandle<GeyserGrpcClientResult<S>>),
@ -62,8 +68,7 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
commitment_config: CommitmentConfig,
// TODO do we want Option<SubscribeUpdate>
) -> impl Stream<Item = Option<SubscribeUpdate>> {
) -> impl Stream<Item = Message> {
let label = grpc_source.label.clone();
// solana_sdk -> yellowstone
@ -148,18 +153,18 @@ pub fn create_geyser_reconnecting_stream(
}
});
(ConnectionState::Connecting(connection_task), None)
(ConnectionState::Connecting(connection_task), Message::Reconnecting)
}
ConnectionState::Connecting(connection_task) => {
let subscribe_result = connection_task.await;
match subscribe_result {
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), None),
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), Message::Reconnecting),
Ok(Err(geyser_error)) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error);
(ConnectionState::WaitReconnect, None)
(ConnectionState::WaitReconnect, Message::Reconnecting)
},
Err(geyser_grpc_task_error) => {
panic!("Task aborted - should not happen :{geyser_grpc_task_error}");
@ -173,17 +178,17 @@ pub fn create_geyser_reconnecting_stream(
match geyser_stream.next().await {
Some(Ok(update_message)) => {
trace!("> recv update message from {}", label);
(ConnectionState::Ready(geyser_stream), Some(update_message))
(ConnectionState::Ready(geyser_stream), Message::GeyserSubscribeUpdate(update_message))
}
Some(Err(tonic_status)) => {
// TODO identify non-recoverable errors and cancel stream
debug!("! error on {} - retrying: {:?}", label, tonic_status);
(ConnectionState::WaitReconnect, None)
(ConnectionState::WaitReconnect, Message::Reconnecting)
}
None => {
//TODO should not arrive. Mean the stream close.
warn!("! geyser stream closed on {} - retrying", label);
(ConnectionState::WaitReconnect, None)
(ConnectionState::WaitReconnect, Message::Reconnecting)
}
}
@ -192,7 +197,7 @@ pub fn create_geyser_reconnecting_stream(
ConnectionState::WaitReconnect => {
info!("Waiting a bit, then connect to {}", label);
sleep(Duration::from_secs(1)).await;
(ConnectionState::NotConnected, None)
(ConnectionState::NotConnected, Message::Reconnecting)
}
}; // -- match

View File

@ -1,6 +1,8 @@
use crate::grpc_subscription_autoreconnect::Message;
use crate::grpc_subscription_autoreconnect::Message::GeyserSubscribeUpdate;
use async_stream::stream;
use futures::Stream;
use log::{debug, info};
use log::{debug, info, warn};
use merge_streams::MergeStreams;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
@ -15,7 +17,7 @@ pub trait FromYellowstoneMapper {
/// use streams created by ``create_geyser_reconnecting_stream``
/// note: this is agnostic to the type of the stream
pub fn create_multiplex<M>(
grpc_source_streams: Vec<impl Stream<Item = Option<SubscribeUpdate>>>,
grpc_source_streams: Vec<impl Stream<Item = Message>>,
commitment_config: CommitmentConfig,
mapper: M,
) -> impl Stream<Item = M::Target>
@ -53,14 +55,14 @@ where
fn map_updates<S, E>(merged_streams: S, mapper: E) -> impl Stream<Item = E::Target>
where
S: Stream<Item = Option<SubscribeUpdate>>,
S: Stream<Item = Message>,
E: FromYellowstoneMapper,
{
let mut tip: Slot = 0;
stream! {
for await update in merged_streams {
match update {
Some(update) => {
GeyserSubscribeUpdate(update) => {
// take only the update messages we want
if let Some((proposed_slot, block)) = mapper.map_yellowstone_update(update) {
if proposed_slot > tip {
@ -69,8 +71,8 @@ where
}
}
}
None => {
debug!("Stream sent None"); // TODO waht does that mean?
Message::Reconnecting => {
warn!("Stream performs reconnect"); // TODO waht does that mean?
}
}
}