geyser-grpc-connector/examples/stream_blocks_autoconnect.rs

142 lines
4.7 KiB
Rust

use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use tokio::sync::broadcast;
use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;
pub struct BlockMini {
pub blocksize: usize,
pub slot: Slot,
pub commitment_config: CommitmentConfig,
}
struct BlockMiniExtractor(CommitmentConfig);
impl FromYellowstoneExtractor for BlockMiniExtractor {
type Target = BlockMini;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message)) => {
let blocksize = update_block_message.encoded_len();
let slot = update_block_message.slot;
let mini = BlockMini {
blocksize,
slot,
commitment_config: self.0,
};
Some((slot, mini))
}
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => {
let blocksize = update_blockmeta_message.encoded_len();
let slot = update_blockmeta_message.slot;
let mini = BlockMini {
blocksize,
slot,
commitment_config: self.0,
};
Some((slot, mini))
}
_ => None,
}
}
}
#[allow(dead_code)]
enum TestCases {
Basic,
SlowReceiverStartup,
TemporaryLaggingReceiver,
CloseAfterReceiving,
AbortTaskFromOutside,
}
const TEST_CASE: TestCases = TestCases::Basic;
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
info!(
"Using grpc source on {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
info!("Write Block stream..");
let (_, exit_notify) = broadcast::channel(1);
let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
exit_notify,
);
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);
tokio::spawn(async move {
if let TestCases::SlowReceiverStartup = TEST_CASE {
sleep(Duration::from_secs(5)).await;
}
let mut message_count = 0;
while let Ok(message) = message_channel.recv().await {
if let TestCases::AbortTaskFromOutside = TEST_CASE {
if message_count > 5 {
info!("(testcase) aborting task from outside");
jh_geyser_task.abort();
}
}
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
message_count += 1;
info!("got update - {} bytes", subscriber_update.encoded_len());
if let TestCases::CloseAfterReceiving = TEST_CASE {
info!("(testcase) closing stream after receiving");
return;
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
if let TestCases::TemporaryLaggingReceiver = TEST_CASE {
if message_count % 3 == 1 {
info!("(testcase) lagging a bit");
sleep(Duration::from_millis(1500)).await;
}
}
}
warn!("Stream aborted");
});
// "infinite" sleep
sleep(Duration::from_secs(2000)).await;
}