This commit is contained in:
GroovieGermanikus 2024-03-11 15:50:43 +01:00
parent 26bc7a3683
commit 37ed49c210
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 28 additions and 32 deletions

View File

@ -1,4 +1,3 @@
use futures::StreamExt;
use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;

View File

@ -1,4 +1,4 @@
use futures::{Stream, StreamExt};
use futures::Stream;
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
@ -33,20 +33,6 @@ use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
fn start_example_block_consumer(
multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static,
) {
tokio::spawn(async move {
let mut block_stream = pin!(multiplex_stream);
while let Some(block) = block_stream.next().await {
info!(
"emitted block #{}@{} from multiplexer",
block.slot, block.commitment_config.commitment
);
}
});
}
fn start_example_blockmeta_consumer(mut multiplex_channel: Receiver<Message>) {
tokio::spawn(async move {
loop {
@ -113,9 +99,6 @@ pub async fn main() {
tracing_subscriber::fmt::init();
// console_subscriber::init();
let subscribe_blocks = true;
let subscribe_blockmeta = false;
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
let grpc_addr_blue = env::var("GRPC_ADDR2").expect("need grpc url for blue");
@ -148,19 +131,19 @@ pub async fn main() {
GrpcSourceConfig::new(grpc_addr_blue, grpc_x_token_blue, None, timeouts.clone());
let toxiproxy_config = GrpcSourceConfig::new(grpc_addr_toxiproxy, None, None, timeouts.clone());
let (autoconnect_tx, mut blockmeta_rx) = tokio::sync::mpsc::channel(10);
let (autoconnect_tx, blockmeta_rx) = tokio::sync::mpsc::channel(10);
info!("Write BlockMeta stream..");
let green_stream_ah = create_geyser_autoconnection_task_with_mpsc(
let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
);
let blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc(
blue_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),
);
let toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc(
toxiproxy_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
autoconnect_tx.clone(),

View File

@ -14,6 +14,7 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;
#[allow(dead_code)]
fn start_example_blockmini_consumer(
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
) {

View File

@ -1,7 +1,4 @@
use log::{debug, info, warn};
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::error::SendTimeoutError;
use log::debug;
/// usage: see plug_pattern test
pub fn spawn_broadcast_channel_plug<T: Send + 'static>(
@ -41,6 +38,10 @@ pub fn spawn_plugger_mpcs_to_broadcast<T: Send + 'static>(
#[cfg(test)]
mod tests {
use super::*;
use log::{info, warn};
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::time::{sleep, timeout};
#[tokio::test]

View File

@ -143,16 +143,25 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
match subscribe_result {
Ok(geyser_stream) => {
if attempt > 1 {
debug!("subscribed to {} after {} failed attempts", grpc_source, attempt);
debug!(
"subscribed to {} after {} failed attempts",
grpc_source, attempt
);
}
ConnectionState::Ready(geyser_stream)
},
}
Err(GeyserGrpcClientError::TonicError(_)) => {
warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt);
warn!(
"subscribe failed on {} after {} attempts - retrying",
grpc_source, attempt
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt);
warn!(
"subscribe failed on {} after {} attempts - retrying",
grpc_source, attempt
);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
// non-recoverable
@ -161,7 +170,10 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"subscribe to {} failed with unrecoverable error: {}",
grpc_source, unrecoverable_error
);
ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError)
ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::SubscribeError,
)
}
}
}