From fd973590206a1d7bc4c97245620640761f0139fb Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 15 Dec 2023 10:02:22 +0100 Subject: [PATCH] use .merge and pin the future --- .../grpcmultiplex_fastestwins_channels.rs | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/experimental/grpcmultiplex_fastestwins_channels.rs b/src/experimental/grpcmultiplex_fastestwins_channels.rs index 4136b34..aba022e 100644 --- a/src/experimental/grpcmultiplex_fastestwins_channels.rs +++ b/src/experimental/grpcmultiplex_fastestwins_channels.rs @@ -3,7 +3,6 @@ use std::collections::{HashMap}; use std::ops::{Add}; -use std::pin::pin; use anyhow::{Context}; use async_stream::stream; use futures::{pin_mut, Stream, StreamExt}; @@ -12,13 +11,14 @@ use log::{debug, info, warn}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentLevel; -use tokio::{select}; +use tokio::{pin, select}; use tokio::sync::broadcast::{Sender}; use tokio::time::{Duration, Instant, sleep_until}; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterBlocks, SubscribeUpdate, SubscribeUpdateBlock}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; +use merge_streams::MergeStreams; // use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block}; // use solana_lite_rpc_core::AnyhowJoinHandle; @@ -45,22 +45,28 @@ pub async fn create_multiplex( grpc_sources.len(), grpc_sources.iter().map(|source| source.label.clone()).join(", ")); - let mut futures = futures::stream::SelectAll::new(); + + let mut streams = vec![]; for grpc_source in grpc_sources { // note: stream never terminates - let stream = create_geyser_reconnecting_stream(grpc_source.clone(), commitment_config).await; - futures.push(Box::pin(stream)); + let stream = create_geyser_reconnecting_stream(grpc_source.clone(), commitment_config).await + .map(|update| { + // TODO wrap in new struct and add the source label + update + }); + streams.push(stream); } - pin_mut!(futures); - let mut futures = pin!(futures.next()); + let merged_streams = streams.merge(); + pin_mut!(merged_streams); + let mut merged_futures = std::pin::pin!(merged_streams.next()); let mut current_slot: Slot = 0; 'main_loop: loop { let block_cmd = select! { - message = &mut futures => { + message = &mut merged_futures => { match message { Some(message) => { map_filter_block_message(current_slot, message, commitment_config)