2024-03-11 07:45:28 -07:00
|
|
|
use crate::{Attempt, GrpcSourceConfig, Message};
|
2023-12-15 01:20:41 -08:00
|
|
|
use futures::{Stream, StreamExt};
|
2024-01-18 04:45:19 -08:00
|
|
|
use log::{debug, error, info, log, trace, warn, Level};
|
2023-12-19 04:03:07 -08:00
|
|
|
use std::time::Duration;
|
2024-01-26 10:16:21 -08:00
|
|
|
use tokio::sync::mpsc::error::SendTimeoutError;
|
2024-01-18 23:43:47 -08:00
|
|
|
use tokio::sync::mpsc::Receiver;
|
2024-01-26 10:16:21 -08:00
|
|
|
use tokio::task::AbortHandle;
|
|
|
|
use tokio::time::{sleep, timeout, Instant};
|
|
|
|
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
|
|
|
|
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
|
2024-01-18 01:57:04 -08:00
|
|
|
use yellowstone_grpc_proto::tonic::service::Interceptor;
|
2024-01-26 10:16:21 -08:00
|
|
|
use yellowstone_grpc_proto::tonic::Status;
|
2023-12-15 01:20:41 -08:00
|
|
|
|
2024-01-26 10:16:21 -08:00
|
|
|
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
|
2023-12-19 02:27:42 -08:00
|
|
|
NotConnected(Attempt),
|
2024-03-11 07:27:32 -07:00
|
|
|
// connected but not subscribed
|
|
|
|
Connecting(Attempt, GeyserGrpcClient<F>),
|
|
|
|
Ready(S),
|
2024-01-26 10:16:21 -08:00
|
|
|
// error states
|
|
|
|
RecoverableConnectionError(Attempt),
|
|
|
|
// non-recoverable error
|
|
|
|
FatalError(Attempt, FatalErrorReason),
|
2023-12-19 02:27:42 -08:00
|
|
|
WaitReconnect(Attempt),
|
2023-12-15 01:20:41 -08:00
|
|
|
}
|
|
|
|
|
2024-01-19 00:13:37 -08:00
|
|
|
enum FatalErrorReason {
|
|
|
|
DownstreamChannelClosed,
|
|
|
|
ConfigurationError,
|
|
|
|
NetworkError,
|
|
|
|
SubscribeError,
|
|
|
|
}
|
|
|
|
|
2024-02-15 03:00:19 -08:00
|
|
|
pub fn create_geyser_autoconnection_task(
|
|
|
|
grpc_source: GrpcSourceConfig,
|
|
|
|
subscribe_filter: SubscribeRequest,
|
|
|
|
) -> (AbortHandle, Receiver<Message>) {
|
|
|
|
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
|
|
|
|
|
2024-03-11 07:36:56 -07:00
|
|
|
let abort_handle =
|
|
|
|
create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender);
|
2024-02-15 03:00:19 -08:00
|
|
|
|
|
|
|
(abort_handle, receiver_channel)
|
|
|
|
}
|
|
|
|
|
2024-02-15 05:56:10 -08:00
|
|
|
/// connect to grpc source performing autoconnect if required,
|
2024-01-19 00:23:55 -08:00
|
|
|
/// returns mpsc channel; task will abort on fatal error
|
2024-02-15 05:56:10 -08:00
|
|
|
/// will shut down when receiver is dropped
|
2024-02-15 03:00:19 -08:00
|
|
|
pub fn create_geyser_autoconnection_task_with_mpsc(
|
2024-01-17 23:45:55 -08:00
|
|
|
grpc_source: GrpcSourceConfig,
|
|
|
|
subscribe_filter: SubscribeRequest,
|
2024-02-15 05:56:10 -08:00
|
|
|
mpsc_downstream: tokio::sync::mpsc::Sender<Message>,
|
2024-02-15 03:00:19 -08:00
|
|
|
) -> AbortHandle {
|
2024-01-18 23:43:47 -08:00
|
|
|
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
|
2024-01-17 23:45:55 -08:00
|
|
|
|
2024-02-15 05:56:10 -08:00
|
|
|
// task will be aborted when downstream receiver gets dropped
|
2024-01-18 01:09:32 -08:00
|
|
|
let jh_geyser_task = tokio::spawn(async move {
|
2024-03-11 07:41:55 -07:00
|
|
|
let mut state = ConnectionState::NotConnected(1);
|
2024-01-19 00:04:03 -08:00
|
|
|
let mut messages_forwarded = 0;
|
2024-01-17 23:45:55 -08:00
|
|
|
|
|
|
|
loop {
|
|
|
|
state = match state {
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::NotConnected(attempt) => {
|
2024-01-17 23:45:55 -08:00
|
|
|
let addr = grpc_source.grpc_addr.clone();
|
|
|
|
let token = grpc_source.grpc_x_token.clone();
|
|
|
|
let config = grpc_source.tls_config.clone();
|
|
|
|
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
|
|
|
|
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
|
2024-01-18 04:45:19 -08:00
|
|
|
log!(
|
2024-03-11 07:41:55 -07:00
|
|
|
if attempt > 1 {
|
2024-01-18 04:45:19 -08:00
|
|
|
Level::Warn
|
|
|
|
} else {
|
|
|
|
Level::Debug
|
|
|
|
},
|
2024-03-11 07:27:32 -07:00
|
|
|
"Connecting attempt {} to {}",
|
2024-03-11 07:41:55 -07:00
|
|
|
attempt,
|
2024-01-18 04:45:19 -08:00
|
|
|
addr
|
|
|
|
);
|
2024-01-17 23:45:55 -08:00
|
|
|
let connect_result = GeyserGrpcClient::connect_with_timeout(
|
2024-01-18 04:45:19 -08:00
|
|
|
addr,
|
|
|
|
token,
|
|
|
|
config,
|
2024-01-17 23:45:55 -08:00
|
|
|
connect_timeout,
|
|
|
|
request_timeout,
|
2024-01-18 04:45:19 -08:00
|
|
|
false,
|
|
|
|
)
|
|
|
|
.await;
|
2024-01-18 01:09:32 -08:00
|
|
|
|
2024-01-18 01:57:04 -08:00
|
|
|
match connect_result {
|
2024-03-11 07:36:56 -07:00
|
|
|
Ok(client) => ConnectionState::Connecting(attempt, client),
|
2024-01-26 10:16:21 -08:00
|
|
|
Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError(
|
2024-03-11 07:36:56 -07:00
|
|
|
attempt + 1,
|
|
|
|
FatalErrorReason::ConfigurationError,
|
|
|
|
),
|
|
|
|
Err(GeyserGrpcClientError::MetadataValueError(_)) => {
|
|
|
|
ConnectionState::FatalError(
|
|
|
|
attempt + 1,
|
|
|
|
FatalErrorReason::ConfigurationError,
|
|
|
|
)
|
2024-01-19 02:34:45 -08:00
|
|
|
}
|
|
|
|
Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => {
|
2024-03-11 07:36:56 -07:00
|
|
|
ConnectionState::FatalError(
|
|
|
|
attempt + 1,
|
|
|
|
FatalErrorReason::ConfigurationError,
|
|
|
|
)
|
2024-01-19 02:34:45 -08:00
|
|
|
}
|
2024-01-18 04:30:49 -08:00
|
|
|
Err(GeyserGrpcClientError::TonicError(tonic_error)) => {
|
2024-01-18 04:45:19 -08:00
|
|
|
warn!(
|
2024-01-31 09:53:07 -08:00
|
|
|
"connect failed on {} - aborting: {:?}",
|
2024-01-18 04:45:19 -08:00
|
|
|
grpc_source, tonic_error
|
|
|
|
);
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::FatalError(attempt + 1, FatalErrorReason::NetworkError)
|
2024-01-18 04:30:49 -08:00
|
|
|
}
|
|
|
|
Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => {
|
2024-01-18 04:45:19 -08:00
|
|
|
warn!(
|
2024-01-31 09:53:07 -08:00
|
|
|
"connect failed on {} - retrying: {:?}",
|
2024-01-18 04:45:19 -08:00
|
|
|
grpc_source, tonic_status
|
|
|
|
);
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::RecoverableConnectionError(attempt + 1)
|
2024-01-18 04:30:49 -08:00
|
|
|
}
|
|
|
|
Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => {
|
2024-01-18 04:45:19 -08:00
|
|
|
warn!(
|
2024-01-31 09:53:07 -08:00
|
|
|
"connect failed with send error on {} - retrying: {:?}",
|
2024-01-18 04:45:19 -08:00
|
|
|
grpc_source, send_error
|
|
|
|
);
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::RecoverableConnectionError(attempt + 1)
|
2024-01-18 01:57:04 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::Connecting(attempt, mut client) => {
|
2024-01-18 04:45:19 -08:00
|
|
|
let subscribe_timeout =
|
|
|
|
grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
|
2024-01-18 01:57:04 -08:00
|
|
|
let subscribe_filter = subscribe_filter.clone();
|
2024-01-17 23:45:55 -08:00
|
|
|
debug!("Subscribe with filter {:?}", subscribe_filter);
|
|
|
|
|
2024-01-18 04:45:19 -08:00
|
|
|
let subscribe_result_timeout = timeout(
|
|
|
|
subscribe_timeout.unwrap_or(Duration::MAX),
|
|
|
|
client.subscribe_once2(subscribe_filter),
|
|
|
|
)
|
|
|
|
.await;
|
2024-01-17 23:45:55 -08:00
|
|
|
|
2024-01-18 04:38:15 -08:00
|
|
|
match subscribe_result_timeout {
|
|
|
|
Ok(subscribe_result) => {
|
|
|
|
match subscribe_result {
|
2024-03-11 07:36:56 -07:00
|
|
|
Ok(geyser_stream) => {
|
2024-03-11 07:41:55 -07:00
|
|
|
if attempt > 1 {
|
2024-03-11 07:50:43 -07:00
|
|
|
debug!(
|
|
|
|
"subscribed to {} after {} failed attempts",
|
|
|
|
grpc_source, attempt
|
|
|
|
);
|
2024-03-11 07:41:55 -07:00
|
|
|
}
|
2024-03-11 07:36:56 -07:00
|
|
|
ConnectionState::Ready(geyser_stream)
|
2024-03-11 07:50:43 -07:00
|
|
|
}
|
2024-01-18 04:38:15 -08:00
|
|
|
Err(GeyserGrpcClientError::TonicError(_)) => {
|
2024-03-11 07:50:43 -07:00
|
|
|
warn!(
|
|
|
|
"subscribe failed on {} after {} attempts - retrying",
|
|
|
|
grpc_source, attempt
|
|
|
|
);
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::RecoverableConnectionError(attempt + 1)
|
2024-01-18 04:38:15 -08:00
|
|
|
}
|
|
|
|
Err(GeyserGrpcClientError::TonicStatus(_)) => {
|
2024-03-11 07:50:43 -07:00
|
|
|
warn!(
|
|
|
|
"subscribe failed on {} after {} attempts - retrying",
|
|
|
|
grpc_source, attempt
|
|
|
|
);
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::RecoverableConnectionError(attempt + 1)
|
2024-01-18 04:38:15 -08:00
|
|
|
}
|
|
|
|
// non-recoverable
|
|
|
|
Err(unrecoverable_error) => {
|
2024-01-18 04:45:19 -08:00
|
|
|
error!(
|
2024-01-31 09:53:07 -08:00
|
|
|
"subscribe to {} failed with unrecoverable error: {}",
|
2024-01-18 04:45:19 -08:00
|
|
|
grpc_source, unrecoverable_error
|
|
|
|
);
|
2024-03-11 07:50:43 -07:00
|
|
|
ConnectionState::FatalError(
|
|
|
|
attempt + 1,
|
|
|
|
FatalErrorReason::SubscribeError,
|
|
|
|
)
|
2024-01-18 04:38:15 -08:00
|
|
|
}
|
|
|
|
}
|
2024-01-18 01:57:04 -08:00
|
|
|
}
|
|
|
|
Err(_elapsed) => {
|
2024-01-18 04:45:19 -08:00
|
|
|
warn!(
|
2024-01-31 09:53:07 -08:00
|
|
|
"subscribe failed with timeout on {} - retrying",
|
2024-01-18 04:45:19 -08:00
|
|
|
grpc_source
|
|
|
|
);
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::RecoverableConnectionError(attempt + 1)
|
2024-01-17 23:45:55 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2024-01-26 10:16:21 -08:00
|
|
|
ConnectionState::RecoverableConnectionError(attempt) => {
|
2024-01-17 23:45:55 -08:00
|
|
|
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
|
2024-01-18 04:45:19 -08:00
|
|
|
info!(
|
2024-01-31 09:53:07 -08:00
|
|
|
"waiting {} seconds, then reconnect to {}",
|
2024-01-18 04:45:19 -08:00
|
|
|
backoff_secs, grpc_source
|
|
|
|
);
|
2024-01-17 23:45:55 -08:00
|
|
|
sleep(Duration::from_secs_f32(backoff_secs)).await;
|
2024-01-26 10:16:21 -08:00
|
|
|
ConnectionState::NotConnected(attempt)
|
2024-01-17 23:45:55 -08:00
|
|
|
}
|
2024-01-26 10:16:21 -08:00
|
|
|
ConnectionState::FatalError(_attempt, reason) => match reason {
|
2024-01-19 02:34:45 -08:00
|
|
|
FatalErrorReason::DownstreamChannelClosed => {
|
2024-01-23 08:50:10 -08:00
|
|
|
warn!("downstream closed - aborting");
|
2024-01-19 02:34:45 -08:00
|
|
|
return;
|
2024-01-19 00:13:37 -08:00
|
|
|
}
|
2024-01-19 02:34:45 -08:00
|
|
|
FatalErrorReason::ConfigurationError => {
|
2024-01-23 08:50:10 -08:00
|
|
|
warn!("fatal configuration error - aborting");
|
2024-01-19 02:34:45 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
FatalErrorReason::NetworkError => {
|
2024-01-23 08:50:10 -08:00
|
|
|
warn!("fatal network error - aborting");
|
2024-01-19 02:34:45 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
FatalErrorReason::SubscribeError => {
|
2024-01-23 08:50:10 -08:00
|
|
|
warn!("fatal grpc subscribe error - aborting");
|
2024-01-19 02:34:45 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
},
|
2024-01-26 10:16:21 -08:00
|
|
|
ConnectionState::WaitReconnect(attempt) => {
|
2024-01-18 01:57:04 -08:00
|
|
|
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
|
2024-01-18 04:45:19 -08:00
|
|
|
info!(
|
2024-01-31 09:53:07 -08:00
|
|
|
"waiting {} seconds, then reconnect to {}",
|
2024-01-18 04:45:19 -08:00
|
|
|
backoff_secs, grpc_source
|
|
|
|
);
|
2024-01-18 01:57:04 -08:00
|
|
|
sleep(Duration::from_secs_f32(backoff_secs)).await;
|
2024-01-26 10:16:21 -08:00
|
|
|
ConnectionState::NotConnected(attempt)
|
2024-01-18 01:57:04 -08:00
|
|
|
}
|
2024-03-11 07:27:32 -07:00
|
|
|
ConnectionState::Ready(mut geyser_stream) => {
|
2024-01-26 10:16:21 -08:00
|
|
|
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
|
2024-01-18 02:11:46 -08:00
|
|
|
'recv_loop: loop {
|
2024-01-26 10:25:39 -08:00
|
|
|
match timeout(
|
|
|
|
receive_timeout.unwrap_or(Duration::MAX),
|
|
|
|
geyser_stream.next(),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
2024-01-26 10:16:21 -08:00
|
|
|
Ok(Some(Ok(update_message))) => {
|
2024-01-18 02:11:46 -08:00
|
|
|
trace!("> recv update message from {}", grpc_source);
|
2024-01-26 10:16:21 -08:00
|
|
|
// note: first send never blocks as the mpsc channel has capacity 1
|
2024-01-19 02:34:45 -08:00
|
|
|
let warning_threshold = if messages_forwarded == 1 {
|
|
|
|
Duration::from_millis(3000)
|
|
|
|
} else {
|
|
|
|
Duration::from_millis(500)
|
|
|
|
};
|
2024-01-18 23:43:47 -08:00
|
|
|
let started_at = Instant::now();
|
2024-02-15 05:56:10 -08:00
|
|
|
match mpsc_downstream
|
2024-01-19 02:34:45 -08:00
|
|
|
.send_timeout(
|
|
|
|
Message::GeyserSubscribeUpdate(Box::new(update_message)),
|
|
|
|
warning_threshold,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
2024-01-18 23:43:47 -08:00
|
|
|
Ok(()) => {
|
2024-01-19 00:04:03 -08:00
|
|
|
messages_forwarded += 1;
|
|
|
|
if messages_forwarded == 1 {
|
|
|
|
// note: first send never blocks - do not print time as this is a lie
|
|
|
|
trace!("queued first update message");
|
|
|
|
} else {
|
2024-01-19 02:34:45 -08:00
|
|
|
trace!(
|
|
|
|
"queued update message {} in {:.02}ms",
|
|
|
|
messages_forwarded,
|
|
|
|
started_at.elapsed().as_secs_f32() * 1000.0
|
|
|
|
);
|
2024-01-19 00:04:03 -08:00
|
|
|
}
|
2024-01-18 02:11:46 -08:00
|
|
|
continue 'recv_loop;
|
|
|
|
}
|
2024-01-18 23:50:08 -08:00
|
|
|
Err(SendTimeoutError::Timeout(the_message)) => {
|
2024-02-15 03:00:19 -08:00
|
|
|
warn!("downstream receiver did not pick up message for {}ms - keep waiting", warning_threshold.as_millis());
|
2024-01-18 23:43:47 -08:00
|
|
|
|
2024-02-15 05:56:10 -08:00
|
|
|
match mpsc_downstream.send(the_message).await {
|
2024-01-18 23:43:47 -08:00
|
|
|
Ok(()) => {
|
2024-01-19 00:04:03 -08:00
|
|
|
messages_forwarded += 1;
|
2024-01-19 02:34:45 -08:00
|
|
|
trace!(
|
|
|
|
"queued delayed update message {} in {:.02}ms",
|
|
|
|
messages_forwarded,
|
|
|
|
started_at.elapsed().as_secs_f32() * 1000.0
|
|
|
|
);
|
2024-01-18 23:43:47 -08:00
|
|
|
}
|
2024-01-19 02:34:45 -08:00
|
|
|
Err(_send_error) => {
|
2024-01-18 23:43:47 -08:00
|
|
|
warn!("downstream receiver closed, message is lost - aborting");
|
2024-03-11 07:36:56 -07:00
|
|
|
break 'recv_loop ConnectionState::FatalError(
|
|
|
|
0,
|
|
|
|
FatalErrorReason::DownstreamChannelClosed,
|
|
|
|
);
|
2024-01-18 23:43:47 -08:00
|
|
|
}
|
|
|
|
}
|
2024-01-18 02:11:46 -08:00
|
|
|
}
|
2024-01-18 23:43:47 -08:00
|
|
|
Err(SendTimeoutError::Closed(_)) => {
|
|
|
|
warn!("downstream receiver closed - aborting");
|
2024-03-11 07:36:56 -07:00
|
|
|
break 'recv_loop ConnectionState::FatalError(
|
|
|
|
0,
|
|
|
|
FatalErrorReason::DownstreamChannelClosed,
|
|
|
|
);
|
2024-01-18 23:43:47 -08:00
|
|
|
}
|
|
|
|
}
|
2024-01-18 02:11:46 -08:00
|
|
|
}
|
2024-01-26 10:16:21 -08:00
|
|
|
Ok(Some(Err(tonic_status))) => {
|
2024-01-18 02:38:45 -08:00
|
|
|
// all tonic errors are recoverable
|
2024-01-23 08:50:10 -08:00
|
|
|
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
|
2024-03-11 07:41:55 -07:00
|
|
|
break 'recv_loop ConnectionState::WaitReconnect(1);
|
2024-01-18 02:11:46 -08:00
|
|
|
}
|
2024-01-26 10:16:21 -08:00
|
|
|
Ok(None) => {
|
2024-01-18 02:11:46 -08:00
|
|
|
warn!("geyser stream closed on {} - retrying", grpc_source);
|
2024-03-11 07:41:55 -07:00
|
|
|
break 'recv_loop ConnectionState::WaitReconnect(1);
|
2024-01-26 10:16:21 -08:00
|
|
|
}
|
|
|
|
Err(_elapsed) => {
|
|
|
|
warn!("timeout on {} - retrying", grpc_source);
|
2024-03-11 07:41:55 -07:00
|
|
|
break 'recv_loop ConnectionState::WaitReconnect(1);
|
2024-01-18 02:11:46 -08:00
|
|
|
}
|
2024-03-11 07:27:32 -07:00
|
|
|
} // -- END match
|
2024-01-26 10:32:37 -08:00
|
|
|
} // -- END receive loop
|
2024-01-18 01:09:32 -08:00
|
|
|
}
|
2024-01-26 10:32:37 -08:00
|
|
|
} // -- END match
|
|
|
|
} // -- endless state loop
|
2024-01-17 23:45:55 -08:00
|
|
|
});
|
|
|
|
|
2024-02-15 03:00:19 -08:00
|
|
|
jh_geyser_task.abort_handle()
|
2024-01-17 23:45:55 -08:00
|
|
|
}
|
|
|
|
|
2024-01-16 23:31:22 -08:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2024-01-19 02:34:45 -08:00
|
|
|
use crate::GrpcConnectionTimeouts;
|
2024-01-16 23:31:22 -08:00
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_debug_no_secrets() {
|
|
|
|
let timeout_config = GrpcConnectionTimeouts {
|
|
|
|
connect_timeout: Duration::from_secs(1),
|
|
|
|
request_timeout: Duration::from_secs(2),
|
|
|
|
subscribe_timeout: Duration::from_secs(3),
|
2024-01-26 10:25:39 -08:00
|
|
|
receive_timeout: Duration::from_secs(3),
|
2024-01-16 23:31:22 -08:00
|
|
|
};
|
|
|
|
assert_eq!(
|
|
|
|
format!(
|
|
|
|
"{:?}",
|
|
|
|
GrpcSourceConfig::new(
|
|
|
|
"http://localhost:1234".to_string(),
|
|
|
|
Some("my-secret".to_string()),
|
|
|
|
None,
|
|
|
|
timeout_config
|
|
|
|
)
|
|
|
|
),
|
|
|
|
"grpc_addr http://localhost:1234"
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_display_no_secrets() {
|
|
|
|
let timeout_config = GrpcConnectionTimeouts {
|
|
|
|
connect_timeout: Duration::from_secs(1),
|
|
|
|
request_timeout: Duration::from_secs(2),
|
|
|
|
subscribe_timeout: Duration::from_secs(3),
|
2024-01-26 10:25:39 -08:00
|
|
|
receive_timeout: Duration::from_secs(3),
|
2024-01-16 23:31:22 -08:00
|
|
|
};
|
|
|
|
assert_eq!(
|
|
|
|
format!(
|
|
|
|
"{}",
|
|
|
|
GrpcSourceConfig::new(
|
|
|
|
"http://localhost:1234".to_string(),
|
|
|
|
Some("my-secret".to_string()),
|
|
|
|
None,
|
|
|
|
timeout_config
|
|
|
|
)
|
|
|
|
),
|
|
|
|
"grpc_addr http://localhost:1234"
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|