pull out slot tip logic

This commit is contained in:
GroovieGermanikus 2023-12-14 18:39:12 +01:00
parent c7fa02d85b
commit a6abda45ad
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 14 additions and 7 deletions

View File

@ -1,7 +1,7 @@
use async_stream::stream;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use log::{info, warn};
use log::{debug, info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
@ -120,13 +120,19 @@ fn map_updates<S, E>(geyser_stream: S, mapper: E) -> impl Stream<Item = E::Targe
let mut tip: Slot = 0;
stream! {
for await update in geyser_stream {
if let Some(update) = update {
if let Some((proposed_slot, block)) = mapper.map_yellowstone_update(update) {
if proposed_slot > tip {
tip = proposed_slot;
yield block;
match update {
Some(update) => {
// take only the update messages we want
if let Some((proposed_slot, block)) = mapper.map_yellowstone_update(update) {
if proposed_slot > tip {
tip = proposed_slot;
yield block;
}
}
}
None => {
debug!("Stream sent None"); // TODO waht does that mean?
}
}
}
}
@ -159,11 +165,12 @@ enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
WaitReconnect,
}
// Connect to Geyser and return a generic stream of SubscribeUpdate
// Takes geyser filter for geyser, connect to Geyser and return a generic stream of SubscribeUpdate
// note: stream never terminates
fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
commitment_config: CommitmentConfig,
// TODO do we want Option<SubscribeUpdate>
) -> impl Stream<Item = Option<SubscribeUpdate>> {
let label = grpc_source.label.clone();