2023-12-15 03:12:06 -08:00
|
|
|
use crate::grpc_subscription_autoreconnect::Message::Reconnecting;
|
2023-12-15 01:20:41 -08:00
|
|
|
use async_stream::stream;
|
|
|
|
use futures::{Stream, StreamExt};
|
2023-12-15 02:59:19 -08:00
|
|
|
use log::{debug, info, log, trace, warn, Level};
|
2023-12-15 01:20:41 -08:00
|
|
|
use solana_sdk::commitment_config::CommitmentConfig;
|
|
|
|
use std::collections::HashMap;
|
2023-12-15 07:11:01 -08:00
|
|
|
use std::ops::{Add, Sub};
|
2023-12-15 01:25:21 -08:00
|
|
|
use std::pin::Pin;
|
2023-12-15 02:43:00 -08:00
|
|
|
use std::sync::atomic::{AtomicI32, Ordering};
|
2023-12-15 07:11:01 -08:00
|
|
|
use std::time::{Duration, Instant};
|
2023-12-15 01:20:41 -08:00
|
|
|
use tokio::task::JoinHandle;
|
2023-12-15 07:11:01 -08:00
|
|
|
use tokio::time::{sleep, sleep_until, timeout};
|
2023-12-15 01:25:21 -08:00
|
|
|
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
|
|
|
|
use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterBlocks, SubscribeUpdate};
|
2023-12-15 01:20:41 -08:00
|
|
|
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
|
|
|
|
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
|
|
|
use yellowstone_grpc_proto::tonic::{async_trait, Status};
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
trait GrpcConnectionFactory: Clone {
|
|
|
|
// async fn connect() -> GeyserGrpcClientResult<impl Stream<Item=Result<SubscribeUpdate, Status>>+Sized>;
|
2023-12-15 01:25:21 -08:00
|
|
|
async fn connect_and_subscribe(
|
|
|
|
&self,
|
|
|
|
) -> GeyserGrpcClientResult<Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>>>>>;
|
2023-12-15 01:20:41 -08:00
|
|
|
}
|
|
|
|
|
2023-12-15 02:59:19 -08:00
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub struct GrpcConnectionTimeouts {
|
|
|
|
pub connect_timeout: Duration,
|
|
|
|
pub request_timeout: Duration,
|
|
|
|
pub subscribe_timeout: Duration,
|
|
|
|
}
|
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub struct GrpcSourceConfig {
|
|
|
|
// symbolic name used in logs
|
|
|
|
pub label: String,
|
|
|
|
grpc_addr: String,
|
|
|
|
grpc_x_token: Option<String>,
|
|
|
|
tls_config: Option<ClientTlsConfig>,
|
2023-12-15 02:59:19 -08:00
|
|
|
timeouts: Option<GrpcConnectionTimeouts>,
|
2023-12-15 01:20:41 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl GrpcSourceConfig {
|
|
|
|
pub fn new(label: String, grpc_addr: String, grpc_x_token: Option<String>) -> Self {
|
|
|
|
Self {
|
|
|
|
label,
|
|
|
|
grpc_addr,
|
|
|
|
grpc_x_token,
|
|
|
|
tls_config: None,
|
2023-12-15 02:59:19 -08:00
|
|
|
timeouts: None,
|
2023-12-15 01:20:41 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-12-15 03:12:06 -08:00
|
|
|
pub enum Message {
|
|
|
|
GeyserSubscribeUpdate(SubscribeUpdate),
|
|
|
|
Reconnecting,
|
|
|
|
}
|
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
|
|
|
|
NotConnected,
|
|
|
|
Connecting(JoinHandle<GeyserGrpcClientResult<S>>),
|
|
|
|
Ready(S),
|
|
|
|
WaitReconnect,
|
|
|
|
}
|
|
|
|
|
|
|
|
// Takes geyser filter for geyser, connect to Geyser and return a generic stream of SubscribeUpdate
|
|
|
|
// note: stream never terminates
|
|
|
|
pub fn create_geyser_reconnecting_stream(
|
|
|
|
grpc_source: GrpcSourceConfig,
|
|
|
|
commitment_config: CommitmentConfig,
|
2023-12-15 03:12:06 -08:00
|
|
|
) -> impl Stream<Item = Message> {
|
2023-12-15 01:20:41 -08:00
|
|
|
let label = grpc_source.label.clone();
|
|
|
|
|
|
|
|
// solana_sdk -> yellowstone
|
|
|
|
let commitment_level = match commitment_config.commitment {
|
2023-12-15 01:25:21 -08:00
|
|
|
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
|
|
|
|
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
|
|
|
|
}
|
|
|
|
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
|
|
|
|
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
|
|
|
|
}
|
2023-12-15 01:20:41 -08:00
|
|
|
_ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"),
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut state = ConnectionState::NotConnected;
|
2023-12-15 02:59:19 -08:00
|
|
|
let connection_attempts = AtomicI32::new(1);
|
2023-12-15 01:20:41 -08:00
|
|
|
|
|
|
|
// in case of cancellation, we restart from here:
|
|
|
|
// thus we want to keep the progression in a state object outside the stream! makro
|
|
|
|
stream! {
|
2023-12-15 02:43:00 -08:00
|
|
|
loop {
|
2023-12-15 01:20:41 -08:00
|
|
|
let yield_value;
|
2023-12-15 02:43:00 -08:00
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
(state, yield_value) = match state {
|
2023-12-15 02:43:00 -08:00
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
ConnectionState::NotConnected => {
|
|
|
|
|
|
|
|
let connection_task = tokio::spawn({
|
|
|
|
let addr = grpc_source.grpc_addr.clone();
|
|
|
|
let token = grpc_source.grpc_x_token.clone();
|
|
|
|
let config = grpc_source.tls_config.clone();
|
2023-12-15 02:59:19 -08:00
|
|
|
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
|
|
|
|
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
|
|
|
|
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
|
2023-12-15 01:20:41 -08:00
|
|
|
// let (block_filter, blockmeta_filter) = blocks_filters.clone();
|
2023-12-15 02:59:19 -08:00
|
|
|
let attempt = connection_attempts.fetch_add(1, Ordering::Relaxed);
|
|
|
|
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
|
2023-12-15 01:20:41 -08:00
|
|
|
async move {
|
|
|
|
|
|
|
|
let connect_result = GeyserGrpcClient::connect_with_timeout(
|
2023-12-15 02:59:19 -08:00
|
|
|
addr, token, config,
|
|
|
|
connect_timeout,
|
|
|
|
request_timeout,
|
|
|
|
false)
|
|
|
|
.await;
|
2023-12-15 01:20:41 -08:00
|
|
|
let mut client = connect_result?;
|
|
|
|
|
|
|
|
// TODO make filter configurable for caller
|
|
|
|
let mut blocks_subs = HashMap::new();
|
|
|
|
blocks_subs.insert(
|
|
|
|
"client".to_string(),
|
|
|
|
SubscribeRequestFilterBlocks {
|
|
|
|
account_include: Default::default(),
|
|
|
|
include_transactions: Some(true),
|
|
|
|
include_accounts: Some(false),
|
|
|
|
include_entries: Some(false),
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
let mut blocksmeta_subs = HashMap::new();
|
|
|
|
blocksmeta_subs.insert(
|
|
|
|
"client".to_string(),
|
|
|
|
SubscribeRequestFilterBlocksMeta {},
|
|
|
|
);
|
|
|
|
|
|
|
|
// Connected;
|
2023-12-15 01:25:21 -08:00
|
|
|
|
|
|
|
|
2023-12-15 02:59:19 -08:00
|
|
|
timeout(subscribe_timeout.unwrap_or(Duration::MAX),
|
|
|
|
client
|
|
|
|
.subscribe_once(
|
|
|
|
HashMap::new(),
|
|
|
|
Default::default(),
|
|
|
|
HashMap::new(),
|
|
|
|
Default::default(),
|
|
|
|
blocks_subs,
|
|
|
|
blocksmeta_subs,
|
|
|
|
Some(commitment_level),
|
|
|
|
Default::default(),
|
|
|
|
None,
|
|
|
|
))
|
|
|
|
.await.expect("timeout on subscribe_once")
|
2023-12-15 01:20:41 -08:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2023-12-15 03:12:06 -08:00
|
|
|
(ConnectionState::Connecting(connection_task), Message::Reconnecting)
|
2023-12-15 01:20:41 -08:00
|
|
|
}
|
2023-12-15 02:43:00 -08:00
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
ConnectionState::Connecting(connection_task) => {
|
|
|
|
let subscribe_result = connection_task.await;
|
|
|
|
|
|
|
|
match subscribe_result {
|
2023-12-15 03:12:06 -08:00
|
|
|
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), Message::Reconnecting),
|
2023-12-15 01:20:41 -08:00
|
|
|
Ok(Err(geyser_error)) => {
|
|
|
|
// TODO identify non-recoverable errors and cancel stream
|
|
|
|
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error);
|
2023-12-15 03:12:06 -08:00
|
|
|
(ConnectionState::WaitReconnect, Message::Reconnecting)
|
2023-12-15 01:20:41 -08:00
|
|
|
},
|
|
|
|
Err(geyser_grpc_task_error) => {
|
|
|
|
panic!("Task aborted - should not happen :{geyser_grpc_task_error}");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2023-12-15 02:43:00 -08:00
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
ConnectionState::Ready(mut geyser_stream) => {
|
|
|
|
|
2023-12-15 01:34:05 -08:00
|
|
|
match geyser_stream.next().await {
|
|
|
|
Some(Ok(update_message)) => {
|
2023-12-15 02:43:00 -08:00
|
|
|
trace!("> recv update message from {}", label);
|
2023-12-15 03:12:06 -08:00
|
|
|
(ConnectionState::Ready(geyser_stream), Message::GeyserSubscribeUpdate(update_message))
|
2023-12-15 01:20:41 -08:00
|
|
|
}
|
2023-12-15 01:34:05 -08:00
|
|
|
Some(Err(tonic_status)) => {
|
|
|
|
// TODO identify non-recoverable errors and cancel stream
|
|
|
|
debug!("! error on {} - retrying: {:?}", label, tonic_status);
|
2023-12-15 03:12:06 -08:00
|
|
|
(ConnectionState::WaitReconnect, Message::Reconnecting)
|
2023-12-15 01:34:05 -08:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
//TODO should not arrive. Mean the stream close.
|
|
|
|
warn!("! geyser stream closed on {} - retrying", label);
|
2023-12-15 03:12:06 -08:00
|
|
|
(ConnectionState::WaitReconnect, Message::Reconnecting)
|
2023-12-15 01:34:05 -08:00
|
|
|
}
|
|
|
|
}
|
2023-12-15 01:20:41 -08:00
|
|
|
|
|
|
|
}
|
2023-12-15 02:43:00 -08:00
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
ConnectionState::WaitReconnect => {
|
2023-12-15 02:43:00 -08:00
|
|
|
info!("Waiting a bit, then connect to {}", label);
|
2023-12-15 01:20:41 -08:00
|
|
|
sleep(Duration::from_secs(1)).await;
|
2023-12-15 03:12:06 -08:00
|
|
|
(ConnectionState::NotConnected, Message::Reconnecting)
|
2023-12-15 01:20:41 -08:00
|
|
|
}
|
2023-12-15 02:43:00 -08:00
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
}; // -- match
|
2023-12-15 02:43:00 -08:00
|
|
|
|
2023-12-15 01:20:41 -08:00
|
|
|
yield yield_value
|
|
|
|
}
|
|
|
|
|
|
|
|
} // -- stream!
|
|
|
|
}
|