use .merge and pin the future

This commit is contained in:
GroovieGermanikus 2023-12-15 10:02:22 +01:00
parent e5c4ed0d0a
commit fd97359020
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 14 additions and 8 deletions

View File

@ -3,7 +3,6 @@
use std::collections::{HashMap};
use std::ops::{Add};
use std::pin::pin;
use anyhow::{Context};
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
@ -12,13 +11,14 @@ use log::{debug, info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::commitment_config::CommitmentLevel;
use tokio::{select};
use tokio::{pin, select};
use tokio::sync::broadcast::{Sender};
use tokio::time::{Duration, Instant, sleep_until};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterBlocks, SubscribeUpdate, SubscribeUpdateBlock};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use merge_streams::MergeStreams;
// use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block};
// use solana_lite_rpc_core::AnyhowJoinHandle;
@ -45,22 +45,28 @@ pub async fn create_multiplex(
grpc_sources.len(),
grpc_sources.iter().map(|source| source.label.clone()).join(", "));
let mut futures = futures::stream::SelectAll::new();
let mut streams = vec![];
for grpc_source in grpc_sources {
// note: stream never terminates
let stream = create_geyser_reconnecting_stream(grpc_source.clone(), commitment_config).await;
futures.push(Box::pin(stream));
let stream = create_geyser_reconnecting_stream(grpc_source.clone(), commitment_config).await
.map(|update| {
// TODO wrap in new struct and add the source label
update
});
streams.push(stream);
}
pin_mut!(futures);
let mut futures = pin!(futures.next());
let merged_streams = streams.merge();
pin_mut!(merged_streams);
let mut merged_futures = std::pin::pin!(merged_streams.next());
let mut current_slot: Slot = 0;
'main_loop: loop {
let block_cmd = select! {
message = &mut futures => {
message = &mut merged_futures => {
match message {
Some(message) => {
map_filter_block_message(current_slot, message, commitment_config)