pull out mapper

This commit is contained in:
GroovieGermanikus 2023-12-14 13:09:43 +01:00
parent c66173afef
commit c7fa02d85b
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 17 additions and 25 deletions

View File

@ -26,14 +26,12 @@ fn start_example_consumer(mut block_stream: impl Stream<Item=ProducedBlock> + Se
struct ExtractBlock(CommitmentConfig);
impl FromYellowstoneMapper for ExtractBlock {
type Target = ProducedBlock;
fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Target)> {
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message))
if update_block_message.slot > current_slot =>
{
let block = map_produced_block(update_block_message, self.0);
Some((block.slot, block))
}
Some(UpdateOneof::Block(update_block_message)) => {
let block = map_produced_block(update_block_message, self.0);
Some((block.slot, block))
}
_ => None,
}
}

View File

@ -63,12 +63,9 @@ impl GrpcConnectionFactory for SampleConnector {
}
pub trait FromYellowstoneMapper {
// Target is something like ProducedBlock
type Target;
fn extract(&self, update: SubscribeUpdate, current_slot: Slot) -> Option<(Slot, Self::Target)>;
// fn get_block_subscription_filter(&self) -> HashMap<String, SubscribeRequestFilterBlocks>;
// fn get_blockmeta_subscription_filter(
// &self,
// ) -> HashMap<String, SubscribeRequestFilterBlocksMeta>;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)>;
}
struct ExtractBlock(CommitmentConfig);
@ -108,30 +105,27 @@ pub fn create_multiplex<E>(
for grpc_source in grpc_sources {
futures.push(Box::pin(create_geyser_reconnecting_stream(
grpc_source.clone(),
SampleConnector {
grpc_addr: grpc_source.grpc_addr.clone(),
grpc_x_token: grpc_source.grpc_x_token.clone(),
tls_config: grpc_source.tls_config.clone(),
},
commitment_config,
)));
}
filter_blocks(futures, extractor)
map_updates(futures, extractor)
}
fn filter_blocks<S, E>(geyser_stream: S, mapper: E) -> impl Stream<Item = E::Target>
fn map_updates<S, E>(geyser_stream: S, mapper: E) -> impl Stream<Item = E::Target>
where
S: Stream<Item = Option<SubscribeUpdate>>,
E: FromYellowstoneMapper,
{
let mut current_slot: Slot = 0;
let mut tip: Slot = 0;
stream! {
for await update in geyser_stream {
if let Some(update) = update {
if let Some((new_slot, block)) = mapper.extract(update, current_slot) {
current_slot = new_slot;
yield block;
if let Some((proposed_slot, block)) = mapper.map_yellowstone_update(update) {
if proposed_slot > tip {
tip = proposed_slot;
yield block;
}
}
}
}
@ -165,11 +159,10 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
WaitReconnect,
}
// TODO use GrpcSource
// Connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
connection_factory: impl GrpcConnectionFactory, // TODO do we need Send+Sync?
commitment_config: CommitmentConfig,
) -> impl Stream<Item = Option<SubscribeUpdate>> {
let label = grpc_source.label.clone();
@ -204,6 +197,7 @@ fn create_geyser_reconnecting_stream(
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;
let mut client = connect_result?;
// TODO make filter configurable for caller
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),