geyser-grpc-connector/src/grpc_subscription_autorecon...

205 lines
8.4 KiB
Rust
Raw Normal View History

2023-12-15 01:20:41 -08:00
use async_stream::stream;
use futures::{Stream, StreamExt};
2023-12-15 02:59:19 -08:00
use log::{debug, info, log, trace, warn, Level};
2023-12-15 01:20:41 -08:00
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
2023-12-15 01:25:21 -08:00
use std::pin::Pin;
2023-12-15 02:43:00 -08:00
use std::sync::atomic::{AtomicI32, Ordering};
2023-12-15 01:20:41 -08:00
use tokio::task::JoinHandle;
2023-12-15 03:01:29 -08:00
use tokio::time::{sleep, timeout, Duration};
2023-12-15 01:25:21 -08:00
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterBlocks, SubscribeUpdate};
2023-12-15 01:20:41 -08:00
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::{async_trait, Status};
#[async_trait]
trait GrpcConnectionFactory: Clone {
// async fn connect() -> GeyserGrpcClientResult<impl Stream<Item=Result<SubscribeUpdate, Status>>+Sized>;
2023-12-15 01:25:21 -08:00
async fn connect_and_subscribe(
&self,
) -> GeyserGrpcClientResult<Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>>>>>;
2023-12-15 01:20:41 -08:00
}
2023-12-15 02:59:19 -08:00
#[derive(Clone, Debug)]
pub struct GrpcConnectionTimeouts {
pub connect_timeout: Duration,
pub request_timeout: Duration,
pub subscribe_timeout: Duration,
}
2023-12-15 01:20:41 -08:00
#[derive(Clone, Debug)]
pub struct GrpcSourceConfig {
// symbolic name used in logs
pub label: String,
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
2023-12-15 02:59:19 -08:00
timeouts: Option<GrpcConnectionTimeouts>,
2023-12-15 01:20:41 -08:00
}
impl GrpcSourceConfig {
pub fn new(label: String, grpc_addr: String, grpc_x_token: Option<String>) -> Self {
Self {
label,
grpc_addr,
grpc_x_token,
tls_config: None,
2023-12-15 02:59:19 -08:00
timeouts: None,
2023-12-15 01:20:41 -08:00
}
}
}
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected,
Connecting(JoinHandle<GeyserGrpcClientResult<S>>),
Ready(S),
WaitReconnect,
}
// Takes geyser filter for geyser, connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
commitment_config: CommitmentConfig,
// TODO do we want Option<SubscribeUpdate>
) -> impl Stream<Item = Option<SubscribeUpdate>> {
let label = grpc_source.label.clone();
// solana_sdk -> yellowstone
let commitment_level = match commitment_config.commitment {
2023-12-15 01:25:21 -08:00
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
2023-12-15 01:20:41 -08:00
_ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"),
};
let mut state = ConnectionState::NotConnected;
2023-12-15 02:59:19 -08:00
let connection_attempts = AtomicI32::new(1);
2023-12-15 01:20:41 -08:00
// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
stream! {
2023-12-15 02:43:00 -08:00
loop {
2023-12-15 01:20:41 -08:00
let yield_value;
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
(state, yield_value) = match state {
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
ConnectionState::NotConnected => {
let connection_task = tokio::spawn({
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
2023-12-15 02:59:19 -08:00
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
2023-12-15 01:20:41 -08:00
// let (block_filter, blockmeta_filter) = blocks_filters.clone();
2023-12-15 02:59:19 -08:00
let attempt = connection_attempts.fetch_add(1, Ordering::Relaxed);
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
2023-12-15 01:20:41 -08:00
async move {
let connect_result = GeyserGrpcClient::connect_with_timeout(
2023-12-15 02:59:19 -08:00
addr, token, config,
connect_timeout,
request_timeout,
false)
.await;
2023-12-15 01:20:41 -08:00
let mut client = connect_result?;
// TODO make filter configurable for caller
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocksMeta {},
);
// Connected;
2023-12-15 01:25:21 -08:00
2023-12-15 02:59:19 -08:00
timeout(subscribe_timeout.unwrap_or(Duration::MAX),
client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
blocksmeta_subs,
Some(commitment_level),
Default::default(),
None,
))
.await.expect("timeout on subscribe_once")
2023-12-15 01:20:41 -08:00
}
});
(ConnectionState::Connecting(connection_task), None)
}
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
ConnectionState::Connecting(connection_task) => {
let subscribe_result = connection_task.await;
match subscribe_result {
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), None),
Ok(Err(geyser_error)) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error);
(ConnectionState::WaitReconnect, None)
},
Err(geyser_grpc_task_error) => {
panic!("Task aborted - should not happen :{geyser_grpc_task_error}");
}
}
}
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
ConnectionState::Ready(mut geyser_stream) => {
2023-12-15 01:34:05 -08:00
match geyser_stream.next().await {
Some(Ok(update_message)) => {
2023-12-15 02:43:00 -08:00
trace!("> recv update message from {}", label);
2023-12-15 01:34:05 -08:00
(ConnectionState::Ready(geyser_stream), Some(update_message))
2023-12-15 01:20:41 -08:00
}
2023-12-15 01:34:05 -08:00
Some(Err(tonic_status)) => {
// TODO identify non-recoverable errors and cancel stream
debug!("! error on {} - retrying: {:?}", label, tonic_status);
(ConnectionState::WaitReconnect, None)
}
None => {
//TODO should not arrive. Mean the stream close.
warn!("! geyser stream closed on {} - retrying", label);
(ConnectionState::WaitReconnect, None)
}
}
2023-12-15 01:20:41 -08:00
}
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
ConnectionState::WaitReconnect => {
2023-12-15 02:43:00 -08:00
info!("Waiting a bit, then connect to {}", label);
2023-12-15 01:20:41 -08:00
sleep(Duration::from_secs(1)).await;
(ConnectionState::NotConnected, None)
}
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
}; // -- match
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
yield yield_value
}
} // -- stream!
}