remove label

This commit is contained in:
GroovieGermanikus 2023-12-19 11:57:23 +01:00
parent 4aeeb45027
commit 342303a9ec
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 26 additions and 34 deletions

View File

@ -95,11 +95,11 @@ pub async fn main() {
subscribe_timeout: Duration::from_secs(5),
};
let green_config = GrpcSourceConfig::new("greensource".to_string(), grpc_addr_green, grpc_x_token_green, timeouts.clone());
let green_config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let blue_config =
GrpcSourceConfig::new("bluesource".to_string(), grpc_addr_blue, grpc_x_token_blue, timeouts.clone());
GrpcSourceConfig::new(grpc_addr_blue, grpc_x_token_blue, None, timeouts.clone());
let toxiproxy_config =
GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_toxiproxy, None, timeouts.clone());
GrpcSourceConfig::new(grpc_addr_toxiproxy, None, None, timeouts.clone());
if subscribe_blocks {
info!("Write Block stream..");

View File

@ -28,8 +28,6 @@ pub struct GrpcConnectionTimeouts {
#[derive(Clone, Debug)]
pub struct GrpcSourceConfig {
// symbolic name used in logs
pub label: String,
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
@ -38,9 +36,8 @@ pub struct GrpcSourceConfig {
impl GrpcSourceConfig {
/// Create a grpc source without tls and timeouts
pub fn new_simple(label: String, grpc_addr: String) -> Self {
pub fn new_simple(grpc_addr: String) -> Self {
Self {
label,
grpc_addr,
grpc_x_token: None,
tls_config: None,
@ -48,14 +45,12 @@ impl GrpcSourceConfig {
}
}
pub fn new(
label: String,
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
timeouts: GrpcConnectionTimeouts,
) -> Self {
Self {
label,
grpc_addr,
grpc_x_token,
tls_config,
@ -112,7 +107,6 @@ pub fn create_geyser_reconnecting_stream(
filter: GeyserFilter,
commitment_config: CommitmentConfig,
) -> impl Stream<Item = Message> {
let label = grpc_source.label.clone();
// solana_sdk -> yellowstone
let commitment_level = match commitment_config.commitment {
@ -129,7 +123,7 @@ pub fn create_geyser_reconnecting_stream(
// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
stream! {
let the_stream = stream! {
loop {
let yield_value;
@ -204,7 +198,7 @@ pub fn create_geyser_reconnecting_stream(
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)),
Ok(Err(geyser_error)) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_error);
warn!("Subscribe failed - retrying: {:?}", geyser_error);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
},
Err(geyser_grpc_task_error) => {
@ -218,17 +212,17 @@ pub fn create_geyser_reconnecting_stream(
match geyser_stream.next().await {
Some(Ok(update_message)) => {
trace!("> recv update message from {}", label);
trace!("> recv update message");
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(update_message))
}
Some(Err(tonic_status)) => {
// TODO identify non-recoverable errors and cancel stream
debug!("! error on {} - retrying: {:?}", label, tonic_status);
debug!("! error - retrying: {:?}", tonic_status);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
None => {
//TODO should not arrive. Mean the stream close.
warn!("! geyser stream closed on {} - retrying", label);
warn!("! geyser stream closed - retrying");
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
}
@ -237,7 +231,7 @@ pub fn create_geyser_reconnecting_stream(
ConnectionState::WaitReconnect(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!("Waiting {} seconds, then connect to {}", backoff_secs, label);
info!("Waiting {} seconds, then reconnect", backoff_secs);
sleep(Duration::from_secs_f32(backoff_secs)).await;
(ConnectionState::NotConnected(attempt), Message::Connecting(attempt))
}
@ -247,5 +241,7 @@ pub fn create_geyser_reconnecting_stream(
yield yield_value
}
} // -- stream!
}; // -- stream!
the_stream
}

View File

@ -1,3 +1,4 @@
use std::pin::Pin;
use crate::grpc_subscription_autoreconnect::Message;
use crate::grpc_subscription_autoreconnect::Message::GeyserSubscribeUpdate;
use async_stream::stream;
@ -21,31 +22,26 @@ struct TaggedMessage {
}
/// use streams created by ``create_geyser_reconnecting_stream``
/// note: this is agnostic to the type of the stream
pub fn create_multiplex<M>(
/// this is agnostic to the type of the stream
/// CAUTION: do not try to use with commitment level "processed" as this will form trees (forks) and not a sequence
pub fn create_multiplex<E>(
grpc_source_streams: Vec<impl Stream<Item = Message>>,
extractor: M,
) -> impl Stream<Item = M::Target>
extractor: E,
) -> impl Stream<Item = E::Target>
where
M: FromYellowstoneExtractor,
E: FromYellowstoneExtractor,
{
if grpc_source_streams.is_empty() {
panic!("Must have at least one source");
panic!("Must have at least one grpc source");
}
info!(
"Starting multiplexer with {} sources",
grpc_source_streams.len(),
);
// use merge
// let mut futures = futures::stream::SelectAll::new();
info!("Starting multiplexer with {} sources", grpc_source_streams.len());
let mut streams = vec![];
let mut idx = 0;
for grpc_source in grpc_source_streams {
let tagged = grpc_source.map(move |msg| TaggedMessage {
for grpc_stream in grpc_source_streams {
let tagged = grpc_stream.map(move |msg| TaggedMessage {
stream_idx: idx,
payload: msg,
});
@ -58,13 +54,13 @@ where
extract_payload_from_geyser_updates(merged_streams, extractor)
}
fn extract_payload_from_geyser_updates<E>(merged_streams: impl Stream<Item = TaggedMessage>, extractor: E) -> impl Stream<Item = E::Target>
fn extract_payload_from_geyser_updates<E>(merged_stream: impl Stream<Item = TaggedMessage>, extractor: E) -> impl Stream<Item = E::Target>
where
E: FromYellowstoneExtractor,
{
let mut tip: Slot = 0;
stream! {
for await TaggedMessage {stream_idx, payload} in merged_streams {
for await TaggedMessage {stream_idx, payload} in merged_stream {
match payload {
GeyserSubscribeUpdate(update) => {
// take only the update messages we want