minor cleanups

This commit is contained in:
GroovieGermanikus 2023-12-15 11:43:00 +01:00
parent 16d9019f25
commit 6081426f74
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 37 additions and 30 deletions

View File

@ -5,17 +5,17 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::pin::pin; use std::pin::pin;
use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock}; use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock};
use geyser_grpc_connector::grpc_subscription_autoreconnect::GrpcSourceConfig; use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcSourceConfig};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper}; use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::geyser::SubscribeUpdate;
fn start_example_consumer(block_stream: impl Stream<Item = ProducedBlock> + Send + 'static) { fn start_example_consumer(multiplex_stream: impl Stream<Item = ProducedBlock> + Send + 'static) {
tokio::spawn(async move { tokio::spawn(async move {
let mut block_stream = pin!(block_stream); let mut block_stream = pin!(multiplex_stream);
while let Some(block) = block_stream.next().await { while let Some(block) = block_stream.next().await {
info!("received block #{}", block.slot,); info!("received block #{} from multiplexer", block.slot,);
} }
}); });
} }
@ -37,7 +37,7 @@ impl FromYellowstoneMapper for ExtractBlock {
#[tokio::main] #[tokio::main]
pub async fn main() { pub async fn main() {
// RUST_LOG=info,grpc_using_streams=debug // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
// console_subscriber::init(); // console_subscriber::init();
@ -56,10 +56,13 @@ pub async fn main() {
let toxiproxy_config = let toxiproxy_config =
GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None); GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None);
let green_stream = create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized());
let blue_stream = create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized());
let toxiproxy_stream = create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized());
let multiplex_stream = create_multiplex( let multiplex_stream = create_multiplex(
vec![green_config, blue_config, toxiproxy_config], vec![green_stream, blue_stream, toxiproxy_stream],
CommitmentConfig::finalized(), CommitmentConfig::finalized(),
ExtractBlock(CommitmentConfig::confirmed()), ExtractBlock(CommitmentConfig::finalized()),
); );
start_example_consumer(multiplex_stream); start_example_consumer(multiplex_stream);

View File

@ -4,6 +4,7 @@ use log::{debug, info, trace, warn};
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap; use std::collections::HashMap;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicI32, Ordering};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
@ -67,15 +68,17 @@ pub fn create_geyser_reconnecting_stream(
_ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"), _ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"),
}; };
// NOT_CONNECTED; CONNECTING
let mut state = ConnectionState::NotConnected; let mut state = ConnectionState::NotConnected;
let connection_attempts = AtomicI32::new(0);
// in case of cancellation, we restart from here: // in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro // thus we want to keep the progression in a state object outside the stream! makro
stream! { stream! {
loop{ loop {
let yield_value; let yield_value;
(state, yield_value) = match state { (state, yield_value) = match state {
ConnectionState::NotConnected => { ConnectionState::NotConnected => {
let connection_task = tokio::spawn({ let connection_task = tokio::spawn({
@ -83,6 +86,7 @@ pub fn create_geyser_reconnecting_stream(
let token = grpc_source.grpc_x_token.clone(); let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone(); let config = grpc_source.tls_config.clone();
// let (block_filter, blockmeta_filter) = blocks_filters.clone(); // let (block_filter, blockmeta_filter) = blocks_filters.clone();
info!("Connecting attempt #{} to {}", connection_attempts.fetch_add(1, Ordering::Relaxed), addr);
async move { async move {
let connect_result = GeyserGrpcClient::connect_with_timeout( let connect_result = GeyserGrpcClient::connect_with_timeout(
@ -128,6 +132,7 @@ pub fn create_geyser_reconnecting_stream(
(ConnectionState::Connecting(connection_task), None) (ConnectionState::Connecting(connection_task), None)
} }
ConnectionState::Connecting(connection_task) => { ConnectionState::Connecting(connection_task) => {
let subscribe_result = connection_task.await; let subscribe_result = connection_task.await;
@ -144,11 +149,12 @@ pub fn create_geyser_reconnecting_stream(
} }
} }
ConnectionState::Ready(mut geyser_stream) => { ConnectionState::Ready(mut geyser_stream) => {
match geyser_stream.next().await { match geyser_stream.next().await {
Some(Ok(update_message)) => { Some(Ok(update_message)) => {
trace!("> update message on {}", label); trace!("> recv update message from {}", label);
(ConnectionState::Ready(geyser_stream), Some(update_message)) (ConnectionState::Ready(geyser_stream), Some(update_message))
} }
Some(Err(tonic_status)) => { Some(Err(tonic_status)) => {
@ -164,12 +170,15 @@ pub fn create_geyser_reconnecting_stream(
} }
} }
ConnectionState::WaitReconnect => { ConnectionState::WaitReconnect => {
// TODO implement backoff info!("Waiting a bit, then connect to {}", label);
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
(ConnectionState::NotConnected, None) (ConnectionState::NotConnected, None)
} }
}; // -- match }; // -- match
yield yield_value yield yield_value
} }

View File

@ -13,14 +13,15 @@ pub trait FromYellowstoneMapper {
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)>; fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)>;
} }
pub fn create_multiplex<E>( /// use streams created by ``create_geyser_reconnecting_stream``
// TODO provide list of streams /// note: this is agnostic to the type of the stream
grpc_sources: Vec<GrpcSourceConfig>, pub fn create_multiplex<M>(
grpc_source_streams: Vec<impl Stream<Item = Option<SubscribeUpdate>>>,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
extractor: E, mapper: M,
) -> impl Stream<Item = E::Target> ) -> impl Stream<Item = M::Target>
where where
E: FromYellowstoneMapper, M: FromYellowstoneMapper,
{ {
assert!( assert!(
commitment_config == CommitmentConfig::confirmed() commitment_config == CommitmentConfig::confirmed()
@ -29,29 +30,23 @@ where
); );
// note: PROCESSED blocks are not sequential in presense of forks; this will break the logic // note: PROCESSED blocks are not sequential in presense of forks; this will break the logic
if grpc_sources.is_empty() { if grpc_source_streams.is_empty() {
panic!("Must have at least one source"); panic!("Must have at least one source");
} }
info!( info!(
"Starting multiplexer with {} sources: {}", "Starting multiplexer with {} sources",
grpc_sources.len(), grpc_source_streams.len(),
grpc_sources
.iter()
.map(|source| source.label.clone())
.join(", ")
); );
// use merge
let mut futures = futures::stream::SelectAll::new(); let mut futures = futures::stream::SelectAll::new();
for grpc_source in grpc_sources { for grpc_source in grpc_source_streams {
futures.push(Box::pin(create_geyser_reconnecting_stream( futures.push(Box::pin(grpc_source));
grpc_source.clone(),
commitment_config,
)));
} }
map_updates(futures, extractor) map_updates(futures, mapper)
} }
fn map_updates<S, E>(geyser_stream: S, mapper: E) -> impl Stream<Item = E::Target> fn map_updates<S, E>(geyser_stream: S, mapper: E) -> impl Stream<Item = E::Target>