diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index 9b9e7da..7cd551e 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -10,7 +10,7 @@ use tokio::sync::broadcast::{Receiver}; use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, ExtractBlockFromStream, GrpcSourceConfig}; +use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper, GrpcSourceConfig}; use crate::literpc_core_model::{map_produced_block, ProducedBlock}; fn start_example_consumer(mut block_stream: impl Stream + Send + 'static) { @@ -24,9 +24,9 @@ fn start_example_consumer(mut block_stream: impl Stream + Se struct ExtractBlock(CommitmentConfig); -impl ExtractBlockFromStream for ExtractBlock { - type Block = ProducedBlock; - fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Block)> { +impl FromYellowstoneMapper for ExtractBlock { + type Target = ProducedBlock; + fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Target)> { match update.update_oneof { Some(UpdateOneof::Block(update_block_message)) if update_block_message.slot > current_slot => @@ -38,24 +38,6 @@ impl ExtractBlockFromStream for ExtractBlock { } } - fn get_block_subscription_filter(&self) -> HashMap { - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - blocks_subs - } - fn get_blockmeta_subscription_filter( - &self, - ) -> HashMap { - HashMap::new() - } } diff --git a/src/grpcmultiplex_fastestwins.rs b/src/grpcmultiplex_fastestwins.rs index e447459..76701cf 100644 --- a/src/grpcmultiplex_fastestwins.rs +++ b/src/grpcmultiplex_fastestwins.rs @@ -5,10 +5,10 @@ use log::{info, warn}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; -use std::pin::pin; +use std::pin::{pin, Pin}; use tokio::task::JoinHandle; use tokio::time::{sleep, Duration}; -use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; +use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdateBlockMeta; use yellowstone_grpc_proto::geyser::{ @@ -16,15 +16,59 @@ use yellowstone_grpc_proto::geyser::{ }; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; -use yellowstone_grpc_proto::tonic::Status; +use yellowstone_grpc_proto::tonic::{async_trait, Status}; -pub trait ExtractBlockFromStream { - type Block; - fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Block)>; - fn get_block_subscription_filter(&self) -> HashMap; - fn get_blockmeta_subscription_filter( - &self, - ) -> HashMap; + +#[async_trait] +trait GrpcConnectionFactory: Clone { + // async fn connect() -> GeyserGrpcClientResult>+Sized>; + async fn connect_and_subscribe(&self) -> GeyserGrpcClientResult>>>>; +} + +#[derive(Clone)] +struct SampleConnector { + grpc_addr: String, + grpc_x_token: Option, + tls_config: Option, +} + +#[async_trait] +impl GrpcConnectionFactory for SampleConnector { + async fn connect_and_subscribe(&self) -> GeyserGrpcClientResult>>>> { + let mut client = GeyserGrpcClient::connect_with_timeout( + self.grpc_addr.clone(), + self.grpc_x_token.clone(), + self.tls_config.clone(), + Some(Duration::from_secs(2)), + Some(Duration::from_secs(2)), + false, + ).await?; + + let subscribe_result = client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + HashMap::new(), + HashMap::new(), + Some(CommitmentLevel::Finalized), + Default::default(), + None, + ).await?; + + Ok(Box::pin(subscribe_result)) + } + +} + +pub trait FromYellowstoneMapper { + type Target; + fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Target)>; + // fn get_block_subscription_filter(&self) -> HashMap; + // fn get_blockmeta_subscription_filter( + // &self, + // ) -> HashMap; } struct ExtractBlock(CommitmentConfig); @@ -36,9 +80,9 @@ pub fn create_multiplex( grpc_sources: Vec, commitment_config: CommitmentConfig, extractor: E, -) -> impl Stream +) -> impl Stream where - E: ExtractBlockFromStream, + E: FromYellowstoneMapper, { assert!( commitment_config == CommitmentConfig::confirmed() @@ -64,10 +108,11 @@ pub fn create_multiplex( for grpc_source in grpc_sources { futures.push(Box::pin(create_geyser_reconnecting_stream( grpc_source.clone(), - ( - extractor.get_block_subscription_filter(), - extractor.get_blockmeta_subscription_filter(), - ), + SampleConnector { + grpc_addr: grpc_source.grpc_addr.clone(), + grpc_x_token: grpc_source.grpc_x_token.clone(), + tls_config: grpc_source.tls_config.clone(), + }, commitment_config, ))); } @@ -75,16 +120,16 @@ pub fn create_multiplex( filter_blocks(futures, extractor) } -fn filter_blocks(geyser_stream: S, extractor: E) -> impl Stream +fn filter_blocks(geyser_stream: S, mapper: E) -> impl Stream where S: Stream>, - E: ExtractBlockFromStream, + E: FromYellowstoneMapper, { let mut current_slot: Slot = 0; stream! { for await update in geyser_stream { if let Some(update) = update { - if let Some((new_slot, block)) = extractor.extract(update, current_slot) { + if let Some((new_slot, block)) = mapper.extract(update, current_slot) { current_slot = new_slot; yield block; } @@ -124,10 +169,7 @@ enum ConnectionState>> { // note: stream never terminates fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, - blocks_filters: ( - HashMap, - HashMap, - ), + connection_factory: impl GrpcConnectionFactory, // TODO do we need Send+Sync? commitment_config: CommitmentConfig, ) -> impl Stream> { let label = grpc_source.label.clone(); @@ -154,7 +196,7 @@ fn create_geyser_reconnecting_stream( let addr = grpc_source.grpc_addr.clone(); let token = grpc_source.grpc_x_token.clone(); let config = grpc_source.tls_config.clone(); - let (block_filter, blockmeta_filter) = blocks_filters.clone(); + // let (block_filter, blockmeta_filter) = blocks_filters.clone(); async move { let connect_result = GeyserGrpcClient::connect_with_timeout( @@ -162,6 +204,23 @@ fn create_geyser_reconnecting_stream( Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await; let mut client = connect_result?; + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); + + let mut blocksmeta_subs = HashMap::new(); + blocksmeta_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocksMeta {}, + ); + // Connected; let subscribe_result = client .subscribe_once( @@ -169,8 +228,8 @@ fn create_geyser_reconnecting_stream( Default::default(), HashMap::new(), Default::default(), - block_filter, - blockmeta_filter, + blocks_subs, + blocksmeta_subs, Some(commitment_level), Default::default(), None,