geyser-grpc-connector/src/grpc_subscription_autorecon...

354 lines
16 KiB
Rust
Raw Normal View History

2023-12-15 01:20:41 -08:00
use futures::{Stream, StreamExt};
2024-01-18 04:45:19 -08:00
use log::{debug, error, info, log, trace, warn, Level};
2023-12-19 05:19:27 -08:00
use solana_sdk::commitment_config::CommitmentConfig;
2023-12-15 01:20:41 -08:00
use std::collections::HashMap;
2024-01-16 23:31:22 -08:00
use std::fmt::{Debug, Display};
2024-01-18 23:43:47 -08:00
use std::future::Future;
2024-01-18 01:57:04 -08:00
use std::pin::Pin;
use std::time::Duration;
2024-01-18 23:25:05 -08:00
use anyhow::bail;
2024-01-18 23:43:47 -08:00
use tokio::sync::mpsc::error::{SendError, SendTimeoutError};
use tokio::sync::mpsc::Receiver;
2023-12-15 01:20:41 -08:00
use tokio::task::JoinHandle;
2024-01-18 01:57:04 -08:00
use tokio::time::error::Elapsed;
2024-01-18 23:43:47 -08:00
use tokio::time::{Instant, sleep, timeout, Timeout};
2024-01-17 23:45:55 -08:00
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult};
2024-01-18 04:45:19 -08:00
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
2023-12-22 08:05:48 -08:00
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate,
};
2023-12-15 01:20:41 -08:00
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
2024-01-17 23:45:55 -08:00
use yellowstone_grpc_proto::tonic;
use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri;
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
2024-01-18 01:57:04 -08:00
use yellowstone_grpc_proto::tonic::service::Interceptor;
2023-12-15 01:20:41 -08:00
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
2024-01-18 02:38:45 -08:00
use yellowstone_grpc_proto::tonic::{Code, Status};
2024-01-18 04:45:19 -08:00
use crate::GrpcSourceConfig;
2023-12-15 01:20:41 -08:00
2023-12-19 02:27:42 -08:00
type Attempt = u32;
// wraps payload and status messages
2024-01-18 01:09:32 -08:00
// clone is required by broacast channel
#[derive(Clone)]
2023-12-15 03:12:06 -08:00
pub enum Message {
2023-12-19 23:54:20 -08:00
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
2023-12-19 02:27:42 -08:00
// connect (attempt=1) or reconnect(attempt=2..)
Connecting(Attempt),
2023-12-15 03:12:06 -08:00
}
2024-01-18 23:50:08 -08:00
#[derive(Debug, Clone)]
pub enum AutoconnectionError {
AbortedFatalError,
}
2023-12-15 01:20:41 -08:00
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
2023-12-19 02:27:42 -08:00
NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
Ready(Attempt, S),
WaitReconnect(Attempt),
2023-12-15 01:20:41 -08:00
}
2024-01-19 00:13:37 -08:00
enum FatalErrorReason {
DownstreamChannelClosed,
ConfigurationError,
NetworkError,
SubscribeError,
// everything else
Misc,
}
2024-01-18 23:50:08 -08:00
enum State<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
2024-01-17 23:45:55 -08:00
NotConnected(Attempt),
2024-01-18 01:57:04 -08:00
Connected(Attempt, GeyserGrpcClient<F>),
Ready(Attempt, S),
// error states
2024-01-17 23:45:55 -08:00
RecoverableConnectionError(Attempt),
2024-01-18 23:25:05 -08:00
// non-recoverable error
2024-01-19 00:13:37 -08:00
FatalError(Attempt, FatalErrorReason),
2024-01-18 01:57:04 -08:00
WaitReconnect(Attempt),
2024-01-17 23:45:55 -08:00
}
2024-01-18 23:25:05 -08:00
/// return handler will exit on fatal error
2024-01-18 23:50:08 -08:00
pub fn create_geyser_autoconnection_task(
2024-01-17 23:45:55 -08:00
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
2024-01-18 23:50:08 -08:00
) -> (JoinHandle<Result<(), AutoconnectionError>>, Receiver<Message>) {
2024-01-18 23:43:47 -08:00
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
let (sender, receiver_stream) = tokio::sync::mpsc::channel::<Message>(1);
2024-01-17 23:45:55 -08:00
2024-01-18 01:09:32 -08:00
let jh_geyser_task = tokio::spawn(async move {
2024-01-19 00:13:37 -08:00
let mut state = State::NotConnected(0);
2024-01-19 00:04:03 -08:00
let mut messages_forwarded = 0;
2024-01-17 23:45:55 -08:00
loop {
state = match state {
2024-01-19 00:13:37 -08:00
State::NotConnected(mut attempt) => {
2024-01-18 01:09:32 -08:00
attempt += 1;
2024-01-17 23:45:55 -08:00
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
2024-01-18 04:45:19 -08:00
log!(
if attempt > 1 {
Level::Warn
} else {
Level::Debug
},
"Connecting attempt #{} to {}",
attempt,
addr
);
2024-01-17 23:45:55 -08:00
let connect_result = GeyserGrpcClient::connect_with_timeout(
2024-01-18 04:45:19 -08:00
addr,
token,
config,
2024-01-17 23:45:55 -08:00
connect_timeout,
request_timeout,
2024-01-18 04:45:19 -08:00
false,
)
.await;
2024-01-18 01:09:32 -08:00
2024-01-18 01:57:04 -08:00
match connect_result {
2024-01-19 00:13:37 -08:00
Ok(client) => State::Connected(attempt, client),
Err(GeyserGrpcClientError::InvalidUri(_)) => State::FatalError(attempt, FatalErrorReason::ConfigurationError),
Err(GeyserGrpcClientError::MetadataValueError(_)) => State::FatalError(attempt, FatalErrorReason::ConfigurationError),
Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => State::FatalError(attempt, FatalErrorReason::ConfigurationError),
2024-01-18 04:30:49 -08:00
Err(GeyserGrpcClientError::TonicError(tonic_error)) => {
2024-01-18 04:45:19 -08:00
warn!(
"! connect failed on {} - aborting: {:?}",
grpc_source, tonic_error
);
2024-01-19 00:13:37 -08:00
State::FatalError(attempt, FatalErrorReason::NetworkError)
2024-01-18 04:30:49 -08:00
}
Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => {
2024-01-18 04:45:19 -08:00
warn!(
"! connect failed on {} - retrying: {:?}",
grpc_source, tonic_status
);
2024-01-19 00:13:37 -08:00
State::RecoverableConnectionError(attempt)
2024-01-18 04:30:49 -08:00
}
Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => {
2024-01-18 04:45:19 -08:00
warn!(
"! connect failed with send error on {} - retrying: {:?}",
grpc_source, send_error
);
2024-01-19 00:13:37 -08:00
State::RecoverableConnectionError(attempt)
2024-01-18 01:57:04 -08:00
}
}
}
2024-01-19 00:13:37 -08:00
State::Connected(attempt, mut client) => {
2024-01-18 04:45:19 -08:00
let subscribe_timeout =
grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
2024-01-18 01:57:04 -08:00
let subscribe_filter = subscribe_filter.clone();
2024-01-17 23:45:55 -08:00
debug!("Subscribe with filter {:?}", subscribe_filter);
2024-01-18 04:45:19 -08:00
let subscribe_result_timeout = timeout(
subscribe_timeout.unwrap_or(Duration::MAX),
client.subscribe_once2(subscribe_filter),
)
.await;
2024-01-17 23:45:55 -08:00
2024-01-18 04:38:15 -08:00
match subscribe_result_timeout {
Ok(subscribe_result) => {
match subscribe_result {
2024-01-19 00:13:37 -08:00
Ok(geyser_stream) => State::Ready(attempt, geyser_stream),
2024-01-18 04:38:15 -08:00
Err(GeyserGrpcClientError::TonicError(_)) => {
warn!("! subscribe failed on {} - retrying", grpc_source);
2024-01-19 00:13:37 -08:00
State::RecoverableConnectionError(attempt)
2024-01-18 04:38:15 -08:00
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!("! subscribe failed on {} - retrying", grpc_source);
2024-01-19 00:13:37 -08:00
State::RecoverableConnectionError(attempt)
2024-01-18 04:38:15 -08:00
}
// non-recoverable
Err(unrecoverable_error) => {
2024-01-18 04:45:19 -08:00
error!(
"! subscribe to {} failed with unrecoverable error: {}",
grpc_source, unrecoverable_error
);
2024-01-19 00:13:37 -08:00
State::FatalError(attempt, FatalErrorReason::SubscribeError)
2024-01-18 04:38:15 -08:00
}
}
2024-01-18 01:57:04 -08:00
}
Err(_elapsed) => {
2024-01-18 04:45:19 -08:00
warn!(
"! subscribe failed with timeout on {} - retrying",
grpc_source
);
2024-01-19 00:13:37 -08:00
State::RecoverableConnectionError(attempt)
2024-01-17 23:45:55 -08:00
}
}
}
2024-01-19 00:13:37 -08:00
State::RecoverableConnectionError(attempt) => {
2024-01-17 23:45:55 -08:00
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
2024-01-18 04:45:19 -08:00
info!(
"! waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
2024-01-17 23:45:55 -08:00
sleep(Duration::from_secs_f32(backoff_secs)).await;
2024-01-19 00:13:37 -08:00
State::NotConnected(attempt)
2024-01-17 23:45:55 -08:00
}
2024-01-19 00:13:37 -08:00
State::FatalError(_attempt, reason) => {
match reason {
FatalErrorReason::DownstreamChannelClosed => {
warn!("! downstream closed - aborting");
return Err(AutoconnectionError::AbortedFatalError);
}
FatalErrorReason::ConfigurationError => {
warn!("! fatal configuration error - aborting");
return Err(AutoconnectionError::AbortedFatalError);
}
FatalErrorReason::NetworkError => {
warn!("! fatal network error - aborting");
return Err(AutoconnectionError::AbortedFatalError);
}
FatalErrorReason::SubscribeError => {
warn!("! fatal grpc subscribe error - aborting");
return Err(AutoconnectionError::AbortedFatalError);
}
FatalErrorReason::Misc => {
error!("! fatal misc error grpc connection - aborting");
return Err(AutoconnectionError::AbortedFatalError);
}
}
2024-01-17 23:45:55 -08:00
}
2024-01-18 23:50:08 -08:00
State::WaitReconnect(attempt) => {
2024-01-18 01:57:04 -08:00
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
2024-01-18 04:45:19 -08:00
info!(
"! waiting {} seconds, then reconnect to {}",
backoff_secs, grpc_source
);
2024-01-18 01:57:04 -08:00
sleep(Duration::from_secs_f32(backoff_secs)).await;
2024-01-18 23:50:08 -08:00
State::NotConnected(attempt)
2024-01-18 01:57:04 -08:00
}
2024-01-19 00:13:37 -08:00
State::Ready(attempt, mut geyser_stream) => {
2024-01-18 02:11:46 -08:00
'recv_loop: loop {
match geyser_stream.next().await {
Some(Ok(update_message)) => {
trace!("> recv update message from {}", grpc_source);
2024-01-18 23:25:05 -08:00
// TODO consider extract this
// backpressure - should'n we block here?
2024-01-18 23:43:47 -08:00
// TODO extract timeout param; TODO respect startup
// emit warning if message not received
2024-01-19 00:04:03 -08:00
// note: first send never blocks
let warning_threshold = if messages_forwarded == 1 { Duration::from_millis(3000) } else { Duration::from_millis(500) };
2024-01-18 23:43:47 -08:00
let started_at = Instant::now();
match sender.send_timeout(Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold).await {
Ok(()) => {
2024-01-19 00:04:03 -08:00
messages_forwarded += 1;
if messages_forwarded == 1 {
// note: first send never blocks - do not print time as this is a lie
trace!("queued first update message");
} else {
trace!("queued update message {} in {:.02}ms",
messages_forwarded, started_at.elapsed().as_secs_f32() * 1000.0);
}
2024-01-18 02:11:46 -08:00
continue 'recv_loop;
}
2024-01-18 23:50:08 -08:00
Err(SendTimeoutError::Timeout(the_message)) => {
2024-01-18 23:43:47 -08:00
warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis());
2024-01-18 23:50:08 -08:00
match sender.send(the_message).await {
2024-01-18 23:43:47 -08:00
Ok(()) => {
2024-01-19 00:04:03 -08:00
messages_forwarded += 1;
trace!("queued delayed update message {} in {:.02}ms",
messages_forwarded, started_at.elapsed().as_secs_f32() * 1000.0);
2024-01-18 23:43:47 -08:00
}
Err(_send_error ) => {
warn!("downstream receiver closed, message is lost - aborting");
2024-01-19 00:13:37 -08:00
break 'recv_loop State::FatalError(attempt, FatalErrorReason::DownstreamChannelClosed);
2024-01-18 23:43:47 -08:00
}
}
2024-01-18 02:11:46 -08:00
}
2024-01-18 23:43:47 -08:00
Err(SendTimeoutError::Closed(_)) => {
warn!("downstream receiver closed - aborting");
2024-01-19 00:13:37 -08:00
break 'recv_loop State::FatalError(attempt, FatalErrorReason::DownstreamChannelClosed);
2024-01-18 23:43:47 -08:00
}
}
// {
// Ok(n_subscribers) => {
// trace!(
// "sent update message to {} subscribers (buffer={})",
// n_subscribers,
// sender.len()
// );
// continue 'recv_loop;
// }
// Err(SendError(_)) => {
// // note: error does not mean that future sends will also fail!
// trace!("no subscribers for update message");
// continue 'recv_loop;
// }
// };
2024-01-18 02:11:46 -08:00
}
Some(Err(tonic_status)) => {
2024-01-18 02:38:45 -08:00
// all tonic errors are recoverable
2024-01-18 02:11:46 -08:00
warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
2024-01-18 23:50:08 -08:00
break 'recv_loop State::WaitReconnect(attempt);
2024-01-18 02:11:46 -08:00
}
2024-01-18 04:45:19 -08:00
None => {
2024-01-18 02:11:46 -08:00
warn!("geyser stream closed on {} - retrying", grpc_source);
2024-01-18 23:50:08 -08:00
break 'recv_loop State::WaitReconnect(attempt);
2024-01-18 02:11:46 -08:00
}
2024-01-18 01:09:32 -08:00
}
2024-01-18 02:38:45 -08:00
} // -- end loop
2024-01-18 01:09:32 -08:00
}
2024-01-17 23:45:55 -08:00
}
}
});
2024-01-18 02:11:46 -08:00
(jh_geyser_task, receiver_stream)
2024-01-17 23:45:55 -08:00
}
2024-01-16 23:31:22 -08:00
#[cfg(test)]
mod tests {
2024-01-18 04:45:19 -08:00
use crate::GrpcConnectionTimeouts;
2024-01-16 23:31:22 -08:00
use super::*;
#[tokio::test]
async fn test_debug_no_secrets() {
let timeout_config = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
"{:?}",
GrpcSourceConfig::new(
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
)
),
"grpc_addr http://localhost:1234"
);
}
#[tokio::test]
async fn test_display_no_secrets() {
let timeout_config = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
"{}",
GrpcSourceConfig::new(
"http://localhost:1234".to_string(),
Some("my-secret".to_string()),
None,
timeout_config
)
),
"grpc_addr http://localhost:1234"
);
}
}