From c7fa02d85b79be84b658be64a3b575397ed21ae2 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 14 Dec 2023 13:09:43 +0100 Subject: [PATCH] pull out mapper --- examples/stream_blocks_mainnet.rs | 12 +++++------- src/grpcmultiplex_fastestwins.rs | 30 ++++++++++++------------------ 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index 7cd551e..2db43d3 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -26,14 +26,12 @@ fn start_example_consumer(mut block_stream: impl Stream + Se struct ExtractBlock(CommitmentConfig); impl FromYellowstoneMapper for ExtractBlock { type Target = ProducedBlock; - fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Target)> { + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { match update.update_oneof { - Some(UpdateOneof::Block(update_block_message)) - if update_block_message.slot > current_slot => - { - let block = map_produced_block(update_block_message, self.0); - Some((block.slot, block)) - } + Some(UpdateOneof::Block(update_block_message)) => { + let block = map_produced_block(update_block_message, self.0); + Some((block.slot, block)) + } _ => None, } } diff --git a/src/grpcmultiplex_fastestwins.rs b/src/grpcmultiplex_fastestwins.rs index 76701cf..6a69284 100644 --- a/src/grpcmultiplex_fastestwins.rs +++ b/src/grpcmultiplex_fastestwins.rs @@ -63,12 +63,9 @@ impl GrpcConnectionFactory for SampleConnector { } pub trait FromYellowstoneMapper { + // Target is something like ProducedBlock type Target; - fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Target)>; - // fn get_block_subscription_filter(&self) -> HashMap; - // fn get_blockmeta_subscription_filter( - // &self, - // ) -> HashMap; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)>; } struct ExtractBlock(CommitmentConfig); @@ -108,30 +105,27 @@ pub fn create_multiplex( for grpc_source in grpc_sources { futures.push(Box::pin(create_geyser_reconnecting_stream( grpc_source.clone(), - SampleConnector { - grpc_addr: grpc_source.grpc_addr.clone(), - grpc_x_token: grpc_source.grpc_x_token.clone(), - tls_config: grpc_source.tls_config.clone(), - }, commitment_config, ))); } - filter_blocks(futures, extractor) + map_updates(futures, extractor) } -fn filter_blocks(geyser_stream: S, mapper: E) -> impl Stream +fn map_updates(geyser_stream: S, mapper: E) -> impl Stream where S: Stream>, E: FromYellowstoneMapper, { - let mut current_slot: Slot = 0; + let mut tip: Slot = 0; stream! { for await update in geyser_stream { if let Some(update) = update { - if let Some((new_slot, block)) = mapper.extract(update, current_slot) { - current_slot = new_slot; - yield block; + if let Some((proposed_slot, block)) = mapper.map_yellowstone_update(update) { + if proposed_slot > tip { + tip = proposed_slot; + yield block; + } } } } @@ -165,11 +159,10 @@ enum ConnectionState>> { WaitReconnect, } -// TODO use GrpcSource +// Connect to Geyser and return a generic stream of SubscribeUpdate // note: stream never terminates fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, - connection_factory: impl GrpcConnectionFactory, // TODO do we need Send+Sync? commitment_config: CommitmentConfig, ) -> impl Stream> { let label = grpc_source.label.clone(); @@ -204,6 +197,7 @@ fn create_geyser_reconnecting_stream( Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await; let mut client = connect_result?; + // TODO make filter configurable for caller let mut blocks_subs = HashMap::new(); blocks_subs.insert( "client".to_string(),