implement channelizer

This commit is contained in:
GroovieGermanikus 2023-12-19 13:42:26 +01:00
parent aff421addb
commit 3d50dce545
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 45 additions and 35 deletions

View File

@ -7,7 +7,7 @@ use std::pin::pin;
use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneExtractor};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplexed_stream, FromYellowstoneExtractor};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
@ -109,7 +109,7 @@ pub async fn main() {
create_geyser_reconnecting_stream(blue_config.clone(), GeyserFilter::blocks_and_txs(), CommitmentConfig::confirmed());
let toxiproxy_stream =
create_geyser_reconnecting_stream(toxiproxy_config.clone(), GeyserFilter::blocks_and_txs(), CommitmentConfig::confirmed());
let multiplex_stream = create_multiplex(
let multiplex_stream = create_multiplexed_stream(
vec![green_stream, blue_stream, toxiproxy_stream],
BlockExtractor(CommitmentConfig::confirmed()),
);
@ -124,7 +124,7 @@ pub async fn main() {
create_geyser_reconnecting_stream(blue_config.clone(), GeyserFilter::blocks_meta(), CommitmentConfig::confirmed());
let toxiproxy_stream =
create_geyser_reconnecting_stream(toxiproxy_config.clone(), GeyserFilter::blocks_meta(), CommitmentConfig::confirmed());
let multiplex_stream = create_multiplex(
let multiplex_stream = create_multiplexed_stream(
vec![green_stream, blue_stream, toxiproxy_stream],
BlockMetaExtractor(CommitmentConfig::confirmed()),
);

View File

@ -1,5 +1,5 @@
use std::pin::pin;
use log::{debug, info};
use log::{debug, info, trace};
use crate::grpc_subscription_autoreconnect::Message;
use crate::grpc_subscription_autoreconnect::Message::GeyserSubscribeUpdate;
use async_stream::stream;
@ -7,31 +7,40 @@ use futures::{Stream, StreamExt};
use merge_streams::MergeStreams;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::spawn;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::Receiver;
use tokio::task::JoinHandle;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use crate::experimental::mock_literpc_core::ProducedBlock;
// pub async fn forward_to_channel<T: Clone>(multiplex_stream: impl Stream<Item = T> + Send + 'static) -> JoinHandle<()> {
// let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(10);
// tokio::task::spawn(async move {
// let mut block_stream = pin!(multiplex_stream);
// 'main_loop: while let Some(block) = block_stream.next().await {
// match block_sx.send(block) {
// Ok(receivers) => {
// debug!("sent block to {} receivers", receivers);
// }
// Err(send_error) => {
// match send_error {
// SendError(_) => {
// info!("Stop sending blocks on stream - shutting down");
// break 'main_loop;
// }
// }
// }
// };
// }
// panic!("forward task failed");
// })
//
// }
pub async fn channelize_stream<T>(source: impl Stream<Item = T> + Send + 'static) -> (Receiver<T>, JoinHandle<()>)
where
T: Clone + Send + Sync + 'static,
{
let (tx, multiplexed_finalized_blocks) = tokio::sync::broadcast::channel::<T>(1000);
let jh_channelizer = spawn(async move {
let mut block_stream = pin!(source);
'main_loop: while let Some(block) = block_stream.next().await {
debug!("multiplex -> ...");
match tx.send(block) {
Ok(receivers) => {
trace!("sent data to {} receivers", receivers);
}
Err(send_error) => {
match send_error {
SendError(_) => {
debug!("no active blockreceivers - skipping message");
continue 'main_loop;
}
}
}
};
}
panic!("forward task failed");
});
(multiplexed_finalized_blocks, jh_channelizer)
}

View File

@ -37,7 +37,7 @@ pub struct GrpcSourceConfig {
impl Display for GrpcSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "grpc_addr: {}", crate::obfuscate::url_obfuscate_api_token(&self.grpc_addr))
write!(f, "grpc_addr {}", crate::obfuscate::url_obfuscate_api_token(&self.grpc_addr))
}
}

View File

@ -16,15 +16,10 @@ pub trait FromYellowstoneExtractor {
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)>;
}
struct TaggedMessage {
pub stream_idx: usize,
pub payload: Message,
}
/// use streams created by ``create_geyser_reconnecting_stream``
/// this is agnostic to the type of the stream
/// CAUTION: do not try to use with commitment level "processed" as this will form trees (forks) and not a sequence
pub fn create_multiplex<E>(
pub fn create_multiplexed_stream<E>(
grpc_source_streams: Vec<impl Stream<Item = Message>>,
extractor: E,
) -> impl Stream<Item = E::Target>
@ -54,6 +49,12 @@ where
extract_payload_from_geyser_updates(merged_streams, extractor)
}
struct TaggedMessage {
pub stream_idx: usize,
pub payload: Message,
}
fn extract_payload_from_geyser_updates<E>(merged_stream: impl Stream<Item = TaggedMessage>, extractor: E) -> impl Stream<Item = E::Target>
where
E: FromYellowstoneExtractor,

View File

@ -2,5 +2,5 @@ pub mod experimental;
pub mod grpc_subscription;
pub mod grpc_subscription_autoreconnect;
pub mod grpcmultiplex_fastestwins;
mod grpc_stream_utils;
pub mod grpc_stream_utils;
mod obfuscate;