From a6abda45ad3a33f95fb79ef78b099ab677787f0e Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 14 Dec 2023 18:39:12 +0100 Subject: [PATCH] pull out slot tip logic --- src/grpcmultiplex_fastestwins.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/grpcmultiplex_fastestwins.rs b/src/grpcmultiplex_fastestwins.rs index 6a69284..1e9dc48 100644 --- a/src/grpcmultiplex_fastestwins.rs +++ b/src/grpcmultiplex_fastestwins.rs @@ -1,7 +1,7 @@ use async_stream::stream; use futures::{Stream, StreamExt}; use itertools::Itertools; -use log::{info, warn}; +use log::{debug, info, warn}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; @@ -120,13 +120,19 @@ fn map_updates(geyser_stream: S, mapper: E) -> impl Stream tip { - tip = proposed_slot; - yield block; + match update { + Some(update) => { + // take only the update messages we want + if let Some((proposed_slot, block)) = mapper.map_yellowstone_update(update) { + if proposed_slot > tip { + tip = proposed_slot; + yield block; + } } } + None => { + debug!("Stream sent None"); // TODO waht does that mean? + } } } } @@ -159,11 +165,12 @@ enum ConnectionState>> { WaitReconnect, } -// Connect to Geyser and return a generic stream of SubscribeUpdate +// Takes geyser filter for geyser, connect to Geyser and return a generic stream of SubscribeUpdate // note: stream never terminates fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, commitment_config: CommitmentConfig, + // TODO do we want Option ) -> impl Stream> { let label = grpc_source.label.clone();