use GeyserFilter as factory

This commit is contained in:
GroovieGermanikus 2023-12-21 07:52:37 +01:00
parent 9fb7c0dda0
commit bcd3f77842
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 67 additions and 63 deletions

View File

@ -144,17 +144,17 @@ pub async fn main() {
info!("Write Block stream..");
let green_stream = create_geyser_reconnecting_stream(
green_config.clone(),
GeyserFilter::blocks_and_txs(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
CommitmentConfig::confirmed(),
);
let blue_stream = create_geyser_reconnecting_stream(
blue_config.clone(),
GeyserFilter::blocks_and_txs(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
CommitmentConfig::confirmed(),
);
let toxiproxy_stream = create_geyser_reconnecting_stream(
toxiproxy_config.clone(),
GeyserFilter::blocks_and_txs(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
CommitmentConfig::confirmed(),
);
let multiplex_stream = create_multiplexed_stream(
@ -168,17 +168,17 @@ pub async fn main() {
info!("Write BlockMeta stream..");
let green_stream = create_geyser_reconnecting_stream(
green_config.clone(),
GeyserFilter::blocks_meta(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
CommitmentConfig::confirmed(),
);
let blue_stream = create_geyser_reconnecting_stream(
blue_config.clone(),
GeyserFilter::blocks_meta(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
CommitmentConfig::confirmed(),
);
let toxiproxy_stream = create_geyser_reconnecting_stream(
toxiproxy_config.clone(),
GeyserFilter::blocks_meta(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
CommitmentConfig::confirmed(),
);
let multiplex_stream = create_multiplexed_stream(

View File

@ -9,7 +9,7 @@ use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate};
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate};
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::{async_trait, Status};
@ -88,37 +88,59 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
}
#[derive(Clone)]
pub enum GeyserFilter {
Blocks(SubscribeRequestFilterBlocks),
BlocksMeta(SubscribeRequestFilterBlocksMeta),
}
pub struct GeyserFilter(pub CommitmentConfig);
impl GeyserFilter {
pub fn blocks_and_txs() -> Self {
Self::Blocks(SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
})
pub fn blocks_and_txs(&self) -> SubscribeRequest {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
pub fn blocks_filter(filter: SubscribeRequestFilterBlocks) -> Self {
Self::Blocks(filter)
}
// no parameters available
pub fn blocks_meta() -> Self {
Self::BlocksMeta(SubscribeRequestFilterBlocksMeta {})
pub fn blocks_meta(&self) -> SubscribeRequest {
let mut blocksmeta_subs = HashMap::new();
blocksmeta_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocksMeta {},
);
SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: blocksmeta_subs,
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
}
}
}
// Take geyser filter, connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
filter: GeyserFilter,
commitment_config: CommitmentConfig,
) -> impl Stream<Item = Message> {
// solana_sdk -> yellowstone
fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
let commitment_level = match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Processed
@ -136,6 +158,16 @@ pub fn create_geyser_reconnecting_stream(
)
}
};
commitment_level
}
// Take geyser filter, connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
commitment_config: CommitmentConfig,
) -> impl Stream<Item = Message> {
let mut state = ConnectionState::NotConnected(0);
@ -157,7 +189,7 @@ pub fn create_geyser_reconnecting_stream(
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let filter = filter.clone();
let subscribe_filter = subscribe_filter.clone();
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {
@ -169,40 +201,12 @@ pub fn create_geyser_reconnecting_stream(
.await;
let mut client = connect_result?;
let mut blocks_subs = HashMap::new();
let mut blocksmeta_subs = HashMap::new();
match filter {
GeyserFilter::Blocks(filter) => {
blocks_subs.insert(
"client".to_string(),
filter,
);
}
GeyserFilter::BlocksMeta(filter) => {
blocksmeta_subs.insert(
"client".to_string(),
filter,
);
}
}
debug!("Subscribe with blocks filter {:?} and blocksmeta filter {:?}", blocks_subs, blocksmeta_subs);
let subscribe_request = SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: blocksmeta_subs,
commitment: Some(commitment_level as i32),
accounts_data_slice: Default::default(),
ping: None,
};
debug!("Subscribe with filter {:?}", subscribe_filter);
let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
client
.subscribe_once2(subscribe_request))
.subscribe_once2(subscribe_filter))
.await;
// maybe not optimal