geyser-grpc-connector/src/grpc_subscription_autorecon...

269 lines
10 KiB
Rust
Raw Normal View History

2023-12-15 01:20:41 -08:00
use async_stream::stream;
use futures::{Stream, StreamExt};
2023-12-15 02:59:19 -08:00
use log::{debug, info, log, trace, warn, Level};
2023-12-19 05:19:27 -08:00
use solana_sdk::commitment_config::CommitmentConfig;
2023-12-15 01:20:41 -08:00
use std::collections::HashMap;
2023-12-19 03:44:49 -08:00
use std::fmt::Display;
2023-12-15 01:25:21 -08:00
use std::pin::Pin;
use std::time::Duration;
2023-12-15 01:20:41 -08:00
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
2023-12-15 01:25:21 -08:00
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
2023-12-20 22:40:18 -08:00
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate};
2023-12-15 01:20:41 -08:00
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::{async_trait, Status};
#[async_trait]
trait GrpcConnectionFactory: Clone {
2023-12-19 05:19:27 -08:00
async fn connect_and_subscribe(
&self,
) -> GeyserGrpcClientResult<Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>>>>>;
2023-12-15 01:20:41 -08:00
}
2023-12-15 02:59:19 -08:00
#[derive(Clone, Debug)]
pub struct GrpcConnectionTimeouts {
pub connect_timeout: Duration,
pub request_timeout: Duration,
pub subscribe_timeout: Duration,
}
2023-12-15 01:20:41 -08:00
#[derive(Clone, Debug)]
pub struct GrpcSourceConfig {
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
2023-12-15 02:59:19 -08:00
timeouts: Option<GrpcConnectionTimeouts>,
2023-12-15 01:20:41 -08:00
}
2023-12-19 03:44:49 -08:00
impl Display for GrpcSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2023-12-19 05:19:27 -08:00
write!(
f,
"grpc_addr {}",
crate::obfuscate::url_obfuscate_api_token(&self.grpc_addr)
)
2023-12-19 03:44:49 -08:00
}
}
2023-12-15 01:20:41 -08:00
impl GrpcSourceConfig {
2023-12-19 02:27:42 -08:00
/// Create a grpc source without tls and timeouts
2023-12-19 02:57:23 -08:00
pub fn new_simple(grpc_addr: String) -> Self {
2023-12-15 01:20:41 -08:00
Self {
grpc_addr,
2023-12-19 02:27:42 -08:00
grpc_x_token: None,
2023-12-15 01:20:41 -08:00
tls_config: None,
2023-12-15 02:59:19 -08:00
timeouts: None,
2023-12-15 01:20:41 -08:00
}
}
2023-12-19 02:27:42 -08:00
pub fn new(
2023-12-15 07:28:01 -08:00
grpc_addr: String,
grpc_x_token: Option<String>,
2023-12-19 02:27:42 -08:00
tls_config: Option<ClientTlsConfig>,
2023-12-15 07:28:01 -08:00
timeouts: GrpcConnectionTimeouts,
) -> Self {
Self {
grpc_addr,
grpc_x_token,
2023-12-19 02:27:42 -08:00
tls_config,
2023-12-15 07:28:01 -08:00
timeouts: Some(timeouts),
}
}
2023-12-15 01:20:41 -08:00
}
2023-12-19 02:27:42 -08:00
type Attempt = u32;
// wraps payload and status messages
2023-12-15 03:12:06 -08:00
pub enum Message {
2023-12-19 23:54:20 -08:00
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
2023-12-19 02:27:42 -08:00
// connect (attempt=1) or reconnect(attempt=2..)
Connecting(Attempt),
2023-12-15 03:12:06 -08:00
}
2023-12-15 01:20:41 -08:00
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
2023-12-19 02:27:42 -08:00
NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
Ready(Attempt, S),
WaitReconnect(Attempt),
2023-12-15 01:20:41 -08:00
}
2023-12-15 08:02:42 -08:00
#[derive(Clone)]
pub enum GeyserFilter {
Blocks(SubscribeRequestFilterBlocks),
BlocksMeta(SubscribeRequestFilterBlocksMeta),
}
impl GeyserFilter {
2023-12-15 08:07:23 -08:00
pub fn blocks_and_txs() -> Self {
2023-12-15 08:02:42 -08:00
Self::Blocks(SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
})
}
2023-12-19 02:27:42 -08:00
pub fn blocks_filter(filter: SubscribeRequestFilterBlocks) -> Self {
Self::Blocks(filter)
}
// no parameters available
2023-12-15 08:02:42 -08:00
pub fn blocks_meta() -> Self {
2023-12-19 02:27:42 -08:00
Self::BlocksMeta(SubscribeRequestFilterBlocksMeta {})
2023-12-15 08:02:42 -08:00
}
}
2023-12-19 02:27:42 -08:00
// Take geyser filter, connect to Geyser and return a generic stream of SubscribeUpdate
2023-12-15 01:20:41 -08:00
// note: stream never terminates
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
2023-12-15 08:02:42 -08:00
filter: GeyserFilter,
2023-12-15 01:20:41 -08:00
commitment_config: CommitmentConfig,
2023-12-15 03:12:06 -08:00
) -> impl Stream<Item = Message> {
2023-12-15 01:20:41 -08:00
// solana_sdk -> yellowstone
let commitment_level = match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
}
2023-12-15 01:25:21 -08:00
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => {
2023-12-19 05:19:27 -08:00
panic!(
"unsupported commitment level {}",
commitment_config.commitment
)
}
2023-12-15 01:20:41 -08:00
};
let mut state = ConnectionState::NotConnected(0);
2023-12-15 01:20:41 -08:00
// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
2023-12-19 02:57:23 -08:00
let the_stream = stream! {
2023-12-15 02:43:00 -08:00
loop {
2023-12-15 01:20:41 -08:00
let yield_value;
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
(state, yield_value) = match state {
2023-12-15 02:43:00 -08:00
ConnectionState::NotConnected(mut attempt) => {
2023-12-19 23:54:20 -08:00
attempt += 1;
2023-12-15 01:20:41 -08:00
let connection_task = tokio::spawn({
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
2023-12-15 02:59:19 -08:00
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
2023-12-15 08:02:42 -08:00
let filter = filter.clone();
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
2023-12-15 01:20:41 -08:00
async move {
let connect_result = GeyserGrpcClient::connect_with_timeout(
2023-12-15 02:59:19 -08:00
addr, token, config,
connect_timeout,
request_timeout,
false)
.await;
2023-12-15 01:20:41 -08:00
let mut client = connect_result?;
let mut blocks_subs = HashMap::new();
let mut blocksmeta_subs = HashMap::new();
2023-12-15 08:02:42 -08:00
match filter {
GeyserFilter::Blocks(filter) => {
blocks_subs.insert(
"client".to_string(),
filter,
);
}
GeyserFilter::BlocksMeta(filter) => {
blocksmeta_subs.insert(
"client".to_string(),
filter,
);
}
}
2023-12-19 03:54:17 -08:00
debug!("Subscribe with blocks filter {:?} and blocksmeta filter {:?}", blocks_subs, blocksmeta_subs);
2023-12-20 22:40:18 -08:00
let subscribe_request = SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: blocksmeta_subs,
commitment: Some(commitment_level as i32),
accounts_data_slice: Default::default(),
ping: None,
};
2023-12-15 08:02:42 -08:00
let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
2023-12-15 02:59:19 -08:00
client
2023-12-20 22:40:18 -08:00
.subscribe_once2(subscribe_request))
2023-12-15 08:02:42 -08:00
.await;
2023-12-15 08:07:23 -08:00
// maybe not optimal
subscribe_result.map_err(|_| Status::unknown("unspecific subscribe timeout"))?
2023-12-15 01:20:41 -08:00
}
});
2023-12-15 08:02:42 -08:00
(ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt))
2023-12-15 01:20:41 -08:00
}
2023-12-15 02:43:00 -08:00
ConnectionState::Connecting(attempt, connection_task) => {
2023-12-15 01:20:41 -08:00
let subscribe_result = connection_task.await;
match subscribe_result {
2023-12-15 08:02:42 -08:00
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)),
2023-12-15 01:20:41 -08:00
Ok(Err(geyser_error)) => {
// ATM we consider all errors recoverable
warn!("! subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error);
2023-12-15 08:02:42 -08:00
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
2023-12-15 01:20:41 -08:00
},
Err(geyser_grpc_task_error) => {
panic!("! task aborted - should not happen :{geyser_grpc_task_error}");
2023-12-15 01:20:41 -08:00
}
}
}
2023-12-15 02:43:00 -08:00
ConnectionState::Ready(attempt, mut geyser_stream) => {
2023-12-15 01:20:41 -08:00
2023-12-15 01:34:05 -08:00
match geyser_stream.next().await {
Some(Ok(update_message)) => {
2023-12-19 03:44:49 -08:00
trace!("> recv update message from {}", grpc_source);
2023-12-19 23:54:20 -08:00
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
2023-12-15 01:20:41 -08:00
}
2023-12-15 01:34:05 -08:00
Some(Err(tonic_status)) => {
// ATM we consider all errors recoverable
2023-12-19 03:44:49 -08:00
debug!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
2023-12-15 08:02:42 -08:00
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
2023-12-15 01:34:05 -08:00
}
None => {
// should not arrive here, Mean the stream close.
panic!("geyser stream closed on {} - retrying", grpc_source);
2023-12-15 01:34:05 -08:00
}
}
2023-12-15 01:20:41 -08:00
}
2023-12-15 02:43:00 -08:00
ConnectionState::WaitReconnect(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source);
sleep(Duration::from_secs_f32(backoff_secs)).await;
2023-12-15 08:02:42 -08:00
(ConnectionState::NotConnected(attempt), Message::Connecting(attempt))
2023-12-15 01:20:41 -08:00
}
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
}; // -- match
2023-12-15 02:43:00 -08:00
2023-12-15 01:20:41 -08:00
yield yield_value
}
2023-12-19 02:57:23 -08:00
}; // -- stream!
the_stream
2023-12-15 01:20:41 -08:00
}