test/example for autoconnect
This commit is contained in:
parent
cf0c83b0c5
commit
9dec39dfeb
|
@ -0,0 +1,145 @@
|
|||
use futures::{Stream, StreamExt};
|
||||
use log::info;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use std::env;
|
||||
use std::pin::pin;
|
||||
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{
|
||||
create_geyser_reconnecting_stream,
|
||||
};
|
||||
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
|
||||
create_multiplexed_stream, FromYellowstoneExtractor,
|
||||
};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use tracing::warn;
|
||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
|
||||
use yellowstone_grpc_proto::prost::Message as _;
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task, Message};
|
||||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
|
||||
|
||||
fn start_example_blockmini_consumer(
|
||||
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
let mut blockmeta_stream = pin!(multiplex_stream);
|
||||
while let Some(mini) = blockmeta_stream.next().await {
|
||||
info!(
|
||||
"emitted block mini #{}@{} with {} bytes from multiplexer",
|
||||
mini.slot, mini.commitment_config.commitment, mini.blocksize
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub struct BlockMini {
|
||||
pub blocksize: usize,
|
||||
pub slot: Slot,
|
||||
pub commitment_config: CommitmentConfig,
|
||||
}
|
||||
|
||||
struct BlockMiniExtractor(CommitmentConfig);
|
||||
|
||||
impl FromYellowstoneExtractor for BlockMiniExtractor {
|
||||
type Target = BlockMini;
|
||||
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
|
||||
match update.update_oneof {
|
||||
Some(UpdateOneof::Block(update_block_message)) => {
|
||||
let blocksize = update_block_message.encoded_len();
|
||||
let slot = update_block_message.slot;
|
||||
let mini = BlockMini {
|
||||
blocksize,
|
||||
slot,
|
||||
commitment_config: self.0,
|
||||
};
|
||||
Some((slot, mini))
|
||||
}
|
||||
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => {
|
||||
let blocksize = update_blockmeta_message.encoded_len();
|
||||
let slot = update_blockmeta_message.slot;
|
||||
let mini = BlockMini {
|
||||
blocksize,
|
||||
slot,
|
||||
commitment_config: self.0,
|
||||
};
|
||||
Some((slot, mini))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum TestCases {
|
||||
Basic,
|
||||
SlowReceiver,
|
||||
}
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn main() {
|
||||
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
|
||||
tracing_subscriber::fmt::init();
|
||||
// console_subscriber::init();
|
||||
|
||||
let test_case = TestCases::SlowReceiver;
|
||||
|
||||
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
|
||||
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
|
||||
|
||||
info!(
|
||||
"Using grpc source on {} ({})",
|
||||
grpc_addr_green,
|
||||
grpc_x_token_green.is_some()
|
||||
);
|
||||
|
||||
let timeouts = GrpcConnectionTimeouts {
|
||||
connect_timeout: Duration::from_secs(5),
|
||||
request_timeout: Duration::from_secs(5),
|
||||
subscribe_timeout: Duration::from_secs(5),
|
||||
};
|
||||
|
||||
let green_config =
|
||||
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
|
||||
|
||||
info!("Write Block stream..");
|
||||
|
||||
let (jh_geyser_task, mut green_stream) = create_geyser_autoconnection_task(
|
||||
green_config.clone(),
|
||||
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
||||
if let TestCases::SlowReceiver = test_case {
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
while let Some(message) = green_stream.recv().await {
|
||||
match message {
|
||||
Message::GeyserSubscribeUpdate(subscriber_update) => {
|
||||
// info!("got update: {:?}", subscriber_update.update_oneof.);
|
||||
info!("got update!!!");
|
||||
}
|
||||
Message::Connecting(attempt) => {
|
||||
warn!("Connection attempt: {}", attempt);
|
||||
}
|
||||
}
|
||||
}
|
||||
warn!("Stream aborted");
|
||||
});
|
||||
|
||||
// let green_stream = create_geyser_reconnecting_stream(
|
||||
// green_config.clone(),
|
||||
// GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
|
||||
// // GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
|
||||
// );
|
||||
// let multiplex_stream = create_multiplexed_stream(
|
||||
// vec![green_stream],
|
||||
// BlockMiniExtractor(CommitmentConfig::confirmed()),
|
||||
// );
|
||||
// start_example_blockmini_consumer(multiplex_stream);
|
||||
|
||||
// "infinite" sleep
|
||||
sleep(Duration::from_secs(1800)).await;
|
||||
}
|
|
@ -5,9 +5,7 @@ use solana_sdk::commitment_config::CommitmentConfig;
|
|||
use std::env;
|
||||
use std::pin::pin;
|
||||
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{
|
||||
create_geyser_reconnecting_stream,
|
||||
};
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{create_geyser_reconnecting_stream, Message};
|
||||
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
|
||||
create_multiplexed_stream, FromYellowstoneExtractor,
|
||||
};
|
||||
|
@ -16,7 +14,6 @@ use tracing::warn;
|
|||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
|
||||
use yellowstone_grpc_proto::prost::Message as _;
|
||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task, Message};
|
||||
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
|
||||
|
||||
fn start_example_blockmini_consumer(
|
||||
|
@ -96,13 +93,15 @@ pub async fn main() {
|
|||
|
||||
info!("Write Block stream..");
|
||||
|
||||
let (jh_geyser_task, mut green_stream) = create_geyser_autoconnection_task(
|
||||
let green_stream= create_geyser_reconnecting_stream(
|
||||
green_config.clone(),
|
||||
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
|
||||
);
|
||||
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(message) = green_stream.recv().await {
|
||||
let mut green_stream = pin!(green_stream);
|
||||
while let Some(message) = green_stream.next().await {
|
||||
match message {
|
||||
Message::GeyserSubscribeUpdate(subscriber_update) => {
|
||||
// info!("got update: {:?}", subscriber_update.update_oneof.);
|
||||
|
|
|
@ -71,7 +71,7 @@ pub fn create_geyser_autoconnection_task(
|
|||
|
||||
let jh_geyser_task = tokio::spawn(async move {
|
||||
let mut state = NotConnected(0);
|
||||
let mut messages_forwared = 0;
|
||||
let mut messages_forwarded = 0;
|
||||
|
||||
loop {
|
||||
state = match state {
|
||||
|
@ -206,12 +206,19 @@ pub fn create_geyser_autoconnection_task(
|
|||
// backpressure - should'n we block here?
|
||||
// TODO extract timeout param; TODO respect startup
|
||||
// emit warning if message not received
|
||||
let warning_threshold = if messages_forwared < 1 { Duration::from_millis(5000) } else { Duration::from_millis(500) };
|
||||
// note: first send never blocks
|
||||
let warning_threshold = if messages_forwarded == 1 { Duration::from_millis(3000) } else { Duration::from_millis(500) };
|
||||
let started_at = Instant::now();
|
||||
match sender.send_timeout(Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold).await {
|
||||
Ok(()) => {
|
||||
messages_forwared += 1;
|
||||
trace!("sent update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32() * 1000.0);
|
||||
messages_forwarded += 1;
|
||||
if messages_forwarded == 1 {
|
||||
// note: first send never blocks - do not print time as this is a lie
|
||||
trace!("queued first update message");
|
||||
} else {
|
||||
trace!("queued update message {} in {:.02}ms",
|
||||
messages_forwarded, started_at.elapsed().as_secs_f32() * 1000.0);
|
||||
}
|
||||
continue 'recv_loop;
|
||||
}
|
||||
Err(SendTimeoutError::Timeout(the_message)) => {
|
||||
|
@ -219,8 +226,9 @@ pub fn create_geyser_autoconnection_task(
|
|||
|
||||
match sender.send(the_message).await {
|
||||
Ok(()) => {
|
||||
messages_forwared += 1;
|
||||
trace!("sent delayed update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32() * 1000.0);
|
||||
messages_forwarded += 1;
|
||||
trace!("queued delayed update message {} in {:.02}ms",
|
||||
messages_forwarded, started_at.elapsed().as_secs_f32() * 1000.0);
|
||||
}
|
||||
Err(_send_error ) => {
|
||||
warn!("downstream receiver closed, message is lost - aborting");
|
||||
|
|
Loading…
Reference in New Issue