This commit is contained in:
GroovieGermanikus 2023-12-14 12:58:47 +01:00
parent 8e8d7c4e07
commit c66173afef
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 89 additions and 48 deletions

View File

@ -10,7 +10,7 @@ use tokio::sync::broadcast::{Receiver};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, ExtractBlockFromStream, GrpcSourceConfig};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper, GrpcSourceConfig};
use crate::literpc_core_model::{map_produced_block, ProducedBlock};
fn start_example_consumer(mut block_stream: impl Stream<Item=ProducedBlock> + Send + 'static) {
@ -24,9 +24,9 @@ fn start_example_consumer(mut block_stream: impl Stream<Item=ProducedBlock> + Se
struct ExtractBlock(CommitmentConfig);
impl ExtractBlockFromStream for ExtractBlock {
type Block = ProducedBlock;
fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Block)> {
impl FromYellowstoneMapper for ExtractBlock {
type Target = ProducedBlock;
fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Target)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message))
if update_block_message.slot > current_slot =>
@ -38,24 +38,6 @@ impl ExtractBlockFromStream for ExtractBlock {
}
}
fn get_block_subscription_filter(&self) -> HashMap<String, SubscribeRequestFilterBlocks> {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
blocks_subs
}
fn get_blockmeta_subscription_filter(
&self,
) -> HashMap<String, SubscribeRequestFilterBlocksMeta> {
HashMap::new()
}
}

View File

@ -5,10 +5,10 @@ use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::pin::pin;
use std::pin::{pin, Pin};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlockMeta;
use yellowstone_grpc_proto::geyser::{
@ -16,15 +16,59 @@ use yellowstone_grpc_proto::geyser::{
};
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::Status;
use yellowstone_grpc_proto::tonic::{async_trait, Status};
pub trait ExtractBlockFromStream {
type Block;
fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Block)>;
fn get_block_subscription_filter(&self) -> HashMap<String, SubscribeRequestFilterBlocks>;
fn get_blockmeta_subscription_filter(
&self,
) -> HashMap<String, SubscribeRequestFilterBlocksMeta>;
#[async_trait]
trait GrpcConnectionFactory: Clone {
// async fn connect() -> GeyserGrpcClientResult<impl Stream<Item=Result<SubscribeUpdate, Status>>+Sized>;
async fn connect_and_subscribe(&self) -> GeyserGrpcClientResult<Pin<Box<dyn Stream<Item=Result<SubscribeUpdate, Status>>>>>;
}
#[derive(Clone)]
struct SampleConnector {
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
}
#[async_trait]
impl GrpcConnectionFactory for SampleConnector {
async fn connect_and_subscribe(&self) -> GeyserGrpcClientResult<Pin<Box<dyn Stream<Item=Result<SubscribeUpdate, Status>>>>> {
let mut client = GeyserGrpcClient::connect_with_timeout(
self.grpc_addr.clone(),
self.grpc_x_token.clone(),
self.tls_config.clone(),
Some(Duration::from_secs(2)),
Some(Duration::from_secs(2)),
false,
).await?;
let subscribe_result = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
HashMap::new(),
HashMap::new(),
Some(CommitmentLevel::Finalized),
Default::default(),
None,
).await?;
Ok(Box::pin(subscribe_result))
}
}
pub trait FromYellowstoneMapper {
type Target;
fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Target)>;
// fn get_block_subscription_filter(&self) -> HashMap<String, SubscribeRequestFilterBlocks>;
// fn get_blockmeta_subscription_filter(
// &self,
// ) -> HashMap<String, SubscribeRequestFilterBlocksMeta>;
}
struct ExtractBlock(CommitmentConfig);
@ -36,9 +80,9 @@ pub fn create_multiplex<E>(
grpc_sources: Vec<GrpcSourceConfig>,
commitment_config: CommitmentConfig,
extractor: E,
) -> impl Stream<Item = E::Block>
) -> impl Stream<Item = E::Target>
where
E: ExtractBlockFromStream,
E: FromYellowstoneMapper,
{
assert!(
commitment_config == CommitmentConfig::confirmed()
@ -64,10 +108,11 @@ pub fn create_multiplex<E>(
for grpc_source in grpc_sources {
futures.push(Box::pin(create_geyser_reconnecting_stream(
grpc_source.clone(),
(
extractor.get_block_subscription_filter(),
extractor.get_blockmeta_subscription_filter(),
),
SampleConnector {
grpc_addr: grpc_source.grpc_addr.clone(),
grpc_x_token: grpc_source.grpc_x_token.clone(),
tls_config: grpc_source.tls_config.clone(),
},
commitment_config,
)));
}
@ -75,16 +120,16 @@ pub fn create_multiplex<E>(
filter_blocks(futures, extractor)
}
fn filter_blocks<S, E>(geyser_stream: S, extractor: E) -> impl Stream<Item = E::Block>
fn filter_blocks<S, E>(geyser_stream: S, mapper: E) -> impl Stream<Item = E::Target>
where
S: Stream<Item = Option<SubscribeUpdate>>,
E: ExtractBlockFromStream,
E: FromYellowstoneMapper,
{
let mut current_slot: Slot = 0;
stream! {
for await update in geyser_stream {
if let Some(update) = update {
if let Some((new_slot, block)) = extractor.extract(update, current_slot) {
if let Some((new_slot, block)) = mapper.extract(update, current_slot) {
current_slot = new_slot;
yield block;
}
@ -124,10 +169,7 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
// note: stream never terminates
fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
blocks_filters: (
HashMap<String, SubscribeRequestFilterBlocks>,
HashMap<String, SubscribeRequestFilterBlocksMeta>,
),
connection_factory: impl GrpcConnectionFactory, // TODO do we need Send+Sync?
commitment_config: CommitmentConfig,
) -> impl Stream<Item = Option<SubscribeUpdate>> {
let label = grpc_source.label.clone();
@ -154,7 +196,7 @@ fn create_geyser_reconnecting_stream(
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
let (block_filter, blockmeta_filter) = blocks_filters.clone();
// let (block_filter, blockmeta_filter) = blocks_filters.clone();
async move {
let connect_result = GeyserGrpcClient::connect_with_timeout(
@ -162,6 +204,23 @@ fn create_geyser_reconnecting_stream(
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;
let mut client = connect_result?;
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocksMeta {},
);
// Connected;
let subscribe_result = client
.subscribe_once(
@ -169,8 +228,8 @@ fn create_geyser_reconnecting_stream(
Default::default(),
HashMap::new(),
Default::default(),
block_filter,
blockmeta_filter,
blocks_subs,
blocksmeta_subs,
Some(commitment_level),
Default::default(),
None,