diff --git a/README.md b/README.md index 6ab211c..6232805 100644 --- a/README.md +++ b/README.md @@ -8,18 +8,3 @@ Multiple _Futures_ are executed in parallel and the first result is returned. No __guarantees__ are made about if the messages are continuous or not. -## _Fastest Wins Strategy_ using task/channel (experimental) -(see `grpcmultiplex_fastestwins_channels.rs`) - -This implementation is built with _Tokio Tasks and Channels_ instead of futures. -It is simpler and more predictable than the _Futures_ implementation. -It uses a _push model_ rather than a _pull model_, i.e. the _task_ pushes the result to the _channel_ while the _Futures_ implementation pulls the result from the chain of _Futures_. - -## _Perfect Seq Strategy_ (experimental) -(see `grpcmultiplex_perfectseq.rs`) - -This strategy **guarantees** that the messages are **continuous** and in the **correct order**. -The _continuous_ property is defined by linking a _Block_ with previous _Block_ using `block.parent_slot` and `block.parent_hash`. -The **disadvantage** is that it will not produce messages if a _Block_ is missing. - -It is limited to the commitment levels __confirmed__ and __finalized__. For __processed__ level there is no sequence due to the potential presence of forks with form a tree. diff --git a/examples/drain_to_tip_pattern.rs b/examples/drain_to_tip_pattern.rs deleted file mode 100644 index 56d34d6..0000000 --- a/examples/drain_to_tip_pattern.rs +++ /dev/null @@ -1,115 +0,0 @@ -use derive_more::Display; - -use log::{debug, error, info}; -use tokio::select; -use tokio::sync::broadcast::{Receiver, Sender}; -use tokio::time::{sleep, Duration}; - -#[derive(Debug, Clone, Display)] -struct Message { - slot: u64, -} - -impl Message { - fn new(slot: u64) -> Self { - Message { slot } - } -} - -#[tokio::main] -async fn main() { - // RUST_LOG=info,stream_via_grpc=debug,drain_to_tip_pattern=debug - tracing_subscriber::fmt::init(); - - let (tx, rx) = tokio::sync::broadcast::channel::(1000); - let (tx_tip, _) = tokio::sync::watch::channel::(Message::new(0)); - - start_progressor(rx, tx_tip.subscribe()).await; - - send_stream(tx.clone()).await; - - // move tip; current tip is 3; next offered slot is 4 - info!("==> force tip to 6 - expect progressor to unblock and offer 7"); - tx_tip.send(Message::new(6)).unwrap(); - - info!("Blocking main thread for some time to allow the system to operate..."); - sleep(tokio::time::Duration::from_secs(4)).await; - info!("Num broadcast subscribers: {}", tx_tip.receiver_count()); - - info!("Shutting down...."); - drop(tx_tip); - sleep(Duration::from_secs(1)).await; - info!("Shutdown completed."); -} - -// this service is dedicated to one source channel which produces a monotonic stream of messages qualified by slot number -// service maintains a tip variable which is updated by different part of system -// service response to tip changes by blocking until the message received from stream has slot number greater than the tip -// service "offers" this message to the rest of the system -async fn start_progressor( - mut blocks_notifier: Receiver, - mut rx_tip: tokio::sync::watch::Receiver, -) { - info!("Started progressor"); - tokio::spawn(async move { - let mut local_tip = Message::new(3); - // block after tip offered by this stream - // TODO: block_after_tip is only valid/useful if greater than tip - let mut highest_block = Message::new(0); - - 'main_loop: loop { - select! { - result = rx_tip.changed() => { - if result.is_err() { - debug!("Tip variable closed"); - break 'main_loop; - } - local_tip = rx_tip.borrow_and_update().clone(); - info!("++> tip changed to {}", local_tip); - if local_tip.slot <= highest_block.slot { - info!("!! next offered slot is invalid: {} <= {}", highest_block.slot, local_tip); - } - // slow down in case of loop - // sleep(Duration::from_millis(100)).await; - } - - // here goes the strategy: either we get a new block OR a timeout - recv_result = blocks_notifier.recv(), if highest_block.slot <= local_tip.slot => { - debug!("!! block_after_tip.slot > local_tip.slot: {} > {}", highest_block.slot, local_tip.slot); - match recv_result { - Ok(msg) => { - info!("=> recv: {}", msg); - if msg.slot > local_tip.slot { - info!("==> offer next slot ({} -> {})", local_tip, msg.slot); - highest_block = msg; - // offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap(); - // this thread will sleep and not issue any recvs until we get tip.changed signal - continue 'main_loop; - } - } - Err(e) => { - // TODO what to do? - error!("Error receiving block: {}", e); - break 'main_loop; - } - } - } - } - } // -- main loop - - info!("Shutting down progressor."); - }); -} - -async fn send_stream(message_channel: Sender) { - // tip is 3 - - // drain 0 to 3; offer 4, then block - for i in 0..10 { - info!("> sending {} (queue size={})", i, message_channel.len()); - message_channel.send(Message::new(i)).unwrap(); - sleep(Duration::from_millis(100)).await; - } - - assert_eq!(message_channel.len(), 5); -} diff --git a/examples/grpcmultiplex_perfectseq.rs b/examples/grpcmultiplex_perfectseq.rs deleted file mode 100644 index 67eacff..0000000 --- a/examples/grpcmultiplex_perfectseq.rs +++ /dev/null @@ -1,406 +0,0 @@ -use anyhow::{bail, Context}; -use futures::StreamExt; -use itertools::Itertools; -use std::collections::HashMap; - -use log::{debug, error, info, warn}; -use solana_sdk::clock::Slot; -use solana_sdk::commitment_config::CommitmentConfig; -use tokio::select; -use tokio::sync::broadcast::error::RecvError; -use tokio::sync::broadcast::{Receiver, Sender}; -use tokio::time::{sleep, timeout, Duration}; -use yellowstone_grpc_client::GeyserGrpcClient; -use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::{ - CommitmentLevel, SubscribeRequestFilterBlocksMeta, SubscribeUpdateBlockMeta, -}; - -pub const GRPC_VERSION: &str = "1.16.1"; - -#[tokio::main] -// #[tokio::main(flavor = "multi_thread", worker_threads = 16)] -pub async fn main() { - // RUST_LOG=info,grpcmultiplex_perfectseq=debug,drain_to_tip_pattern=debug - tracing_subscriber::fmt::init(); - - // mango validator (mainnet) - let _grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string(); - // ams81 (mainnet) - let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000".to_string(); - // testnet - NOTE: this connection has terrible lags (almost 5 minutes) - // let grpc_addr = "http://147.28.169.13:10000".to_string(); - - // let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000); - let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::(1000); - let (block_sx_blue, blocks_notifier_blue) = tokio::sync::broadcast::channel(1000); - - // TODO ship ProducedBlock - let (sx_multi, rx_multi) = tokio::sync::broadcast::channel::(1000); - - let grpc_x_token = None; - let _block_confirmed_task_green = create_blockmeta_processing_task( - _grpc_addr_mainnet_triton.clone(), - grpc_x_token.clone(), - block_sx_green.clone(), - CommitmentLevel::Confirmed, - ); - - let _block_confirmed_task_blue = create_blockmeta_processing_task( - grpc_addr_mainnet_ams81.clone(), - grpc_x_token.clone(), - block_sx_blue.clone(), - CommitmentLevel::Confirmed, - ); - - let (tx_tip, _) = tokio::sync::watch::channel::(0); - - let (offer_block_sender, mut offer_block_notifier) = - tokio::sync::mpsc::channel::(100); - - // producers - start_progressor( - "green".to_string(), - blocks_notifier_green, - tx_tip.subscribe(), - offer_block_sender.clone(), - ) - .await; - start_progressor( - "blue".to_string(), - blocks_notifier_blue, - tx_tip.subscribe(), - offer_block_sender.clone(), - ) - .await; - - // merge task - // collect the offered slots from the two channels - tokio::spawn(async move { - // need to wait until channels reached slot beyond tip - // tokio::time::sleep(Duration::from_secs(14)).await; - - // see also start procedure! - let mut current_tip = 0; - let mut blocks_offered = Vec::::new(); - 'main_loop: loop { - // note: we abuse the timeout mechanism to collect some blocks - let timeout_secs = if current_tip == 0 { 20 } else { 10 }; - - let msg_or_timeout = timeout( - Duration::from_secs(timeout_secs), - offer_block_notifier.recv(), - ) - .await; - info!("- current_tip={}", current_tip); - - match msg_or_timeout { - Ok(Some(offer_block_msg)) => { - // collect the offered slots from the channels - match offer_block_msg { - OfferBlockMsg::NextSlot(label, block_offered) => { - info!("<< offered slot from {}: {:?}", label, block_offered); - - // TOOD use .parent instead - if block_offered.parent_slot == current_tip { - current_tip = block_offered.slot; - info!("<< take block from {} as new tip {}", label, current_tip); - assert_ne!(current_tip, 0, "must not see uninitialized tip"); - - emit_block_on_multiplex_output_channel(&sx_multi, current_tip); - tx_tip.send(current_tip).unwrap(); - blocks_offered.clear(); - continue 'main_loop; - } else { - // keep the block for later - blocks_offered.push(block_offered); - continue 'main_loop; - } - } - } - // TODO handle else - } - Ok(None) => { - // TODO double-check - // channel closed - info!("--> channel closed"); - break; - } - Err(_elapsed) => { - // timeout - info!("--> timeout: got these slots: {:?}", blocks_offered); - - // we abuse timeout feature to wait for some blocks to arrive to select the "best" one - if current_tip == 0 { - let start_slot = blocks_offered - .iter() - .max_by(|lhs, rhs| lhs.slot.cmp(&rhs.slot)) - .expect("need at least one slot to start"); - current_tip = start_slot.slot; - assert_ne!(current_tip, 0, "must not see uninitialized tip"); - - emit_block_on_multiplex_output_channel(&sx_multi, current_tip); - tx_tip.send(current_tip).unwrap(); - info!("--> initializing with tip {}", current_tip); - blocks_offered.clear(); - continue 'main_loop; - } - - match blocks_offered - .iter() - .filter(|b| b.parent_slot == current_tip) - .exactly_one() - { - Ok(found_next) => { - current_tip = found_next.slot; - assert_ne!(current_tip, 0, "must not see uninitialized tip"); - tx_tip.send(current_tip).unwrap(); - info!("--> progressing tip to {}", current_tip); - blocks_offered.clear(); - } - Err(_missed) => { - warn!("--> no slots offered - SHOULD ABORT - no hope to progress"); - } - } - - sleep(Duration::from_millis(500)).await; - } - } - } // -- recv loop - - info!("Shutting down merge task."); - }); - - tokio::spawn(async move { - let mut rx_multi = rx_multi; - let mut last_slot = 0; - loop { - let slot = rx_multi.recv().await.unwrap(); - assert_ne!(slot, 0, "must not see empty slot"); - info!("==> multiplexed slot: {}", slot); - if slot - last_slot > 1 && last_slot != 0 { - warn!("==> gap: {} -> {}", last_slot, slot); - } - last_slot = slot; - } - }); - - // "infinite" sleep - sleep(Duration::from_secs(1800)).await; - - // TODO proper shutdown - info!("Shutdown completed."); -} - -fn emit_block_on_multiplex_output_channel(sx_multi: &Sender, current_tip: u64) { - sx_multi.send(current_tip).unwrap(); -} - -#[derive(Clone, Debug)] -struct BlockRef { - pub slot: Slot, - pub parent_slot: Slot, -} - -impl From for BlockRef { - fn from(block: SimpleBlockMeta) -> Self { - BlockRef { - slot: block.slot, - parent_slot: block.parent_slot, - } - } -} - -#[derive(Debug)] -enum OfferBlockMsg { - NextSlot(String, BlockRef), -} - -async fn start_progressor( - label: String, - blocks_notifier: Receiver, - mut rx_tip: tokio::sync::watch::Receiver, - offer_block_sender: tokio::sync::mpsc::Sender, -) { - tokio::spawn(async move { - // TODO is .resubscribe what we want? - let mut blocks_notifier = blocks_notifier.resubscribe(); - // for test only - // let start_slot = blocks_notifier.recv().await.unwrap().slot; - - // local copy of tip - let mut local_tip = 0; - - // block after tip offered by this stream - // TODO: block_after_tip is only valid/useful if greater than tip - let mut highest_block: BlockRef = BlockRef { - slot: 0, - parent_slot: 0, - }; - 'main_loop: loop { - select! { - result = rx_tip.changed() => { - if result.is_err() { - debug!("Tip variable closed for {}", label); - break 'main_loop; - } - local_tip = *rx_tip.borrow_and_update(); - info!("++> {} tip changed to {}", label, local_tip); - // TODO update local tip - } - recv_result = blocks_notifier.recv(), if highest_block.slot <= local_tip => { - match recv_result { - Ok(block) => { - info!("=> recv on {}: {}",label, format_block(&block)); - if block.slot > local_tip { - info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip); - highest_block = BlockRef::from(block); - offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), highest_block.clone())).await.unwrap(); - // this thread will sleep and not issue any recvs until we get tip.changed signal - continue 'main_loop; - } - } - Err(e) => { - // TODO what to do? - error!("Error receiving block: {}", e); - break 'main_loop; - } - } - } - } - } - }); -} - -fn format_block(block: &SimpleBlockMeta) -> String { - format!( - "{:?}@{} (-> {})", - block.slot, block.commitment_config.commitment, block.parent_slot - ) -} - -fn start_monkey_broadcast(capacity: usize) -> (Sender, Receiver) { - let (tx, monkey_upstream) = tokio::sync::broadcast::channel::(1024); - let (monkey_downstream, rx) = tokio::sync::broadcast::channel::(capacity); - - tokio::spawn(async move { - let mut monkey_upstream = monkey_upstream; - 'recv_loop: for counter in 1.. { - let value = match monkey_upstream.recv().await { - Ok(msg) => msg, - Err(RecvError::Closed) => { - return; - } - Err(RecvError::Lagged(_)) => { - continue; - } - }; - if let Ok(val) = monkey_upstream.recv().await { - val - } else { - return; - }; - // info!("forwarding: {}", value); - // failes if there are no receivers - - if counter % 3 == 0 { - debug!("% delay value (monkey)"); - tokio::time::sleep(Duration::from_millis(700)).await; - } - if counter % 5 == 0 { - debug!("% drop value (monkey)"); - continue 'recv_loop; - } - if counter % 23 == 0 { - debug!("% system outage + reboot (monkey)"); - tokio::time::sleep(Duration::from_secs(5)).await; - } - - let send_result = monkey_downstream.send(value); - match send_result { - Ok(_) => { - debug!("% forwarded"); - } - Err(_) => panic!("Should never happen"), - } - } - }); - - (tx, rx) -} - -#[derive(Debug, Clone)] -struct SimpleBlockMeta { - slot: Slot, - parent_slot: Slot, - commitment_config: CommitmentConfig, -} - -fn create_blockmeta_processing_task( - grpc_addr: String, - grpc_x_token: Option, - block_sx: Sender, - commitment_level: CommitmentLevel, -) -> tokio::task::JoinHandle> { - let mut blocks_subs = HashMap::new(); - blocks_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {}); - - let commitment_config = match commitment_level { - CommitmentLevel::Confirmed => CommitmentConfig::confirmed(), - CommitmentLevel::Finalized => CommitmentConfig::finalized(), - CommitmentLevel::Processed => CommitmentConfig::processed(), - }; - - tokio::spawn(async move { - // connect to grpc - let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?; - let mut stream = client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - Default::default(), - blocks_subs, - Some(commitment_level), - Default::default(), - None, - ) - .await?; - - while let Some(message) = stream.next().await { - let message = message?; - - let Some(update) = message.update_oneof else { - continue; - }; - - match update { - UpdateOneof::BlockMeta(block_meta) => { - let block_meta = process_blockmeta(block_meta, commitment_config); - block_sx - .send(block_meta) - .context("Grpc failed to send a block")?; - } - UpdateOneof::Ping(_) => { - log::trace!("GRPC Ping"); - } - u => { - bail!("Unexpected update: {u:?}"); - } - }; - } - bail!("geyser slot stream ended"); - }) -} - -fn process_blockmeta( - block_meta: SubscribeUpdateBlockMeta, - commitment_config: CommitmentConfig, -) -> SimpleBlockMeta { - SimpleBlockMeta { - slot: block_meta.slot, - parent_slot: block_meta.parent_slot, - commitment_config, - } -} diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index 157b6a6..78f248d 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -5,7 +5,9 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::pin::pin; use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock}; -use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcSourceConfig}; +use geyser_grpc_connector::grpc_subscription_autoreconnect::{ + create_geyser_reconnecting_stream, GrpcSourceConfig, +}; use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper}; use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; @@ -56,9 +58,12 @@ pub async fn main() { let toxiproxy_config = GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None); - let green_stream = create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized()); - let blue_stream = create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized()); - let toxiproxy_stream = create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized()); + let green_stream = + create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized()); + let blue_stream = + create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized()); + let toxiproxy_stream = + create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized()); let multiplex_stream = create_multiplex( vec![green_stream, blue_stream, toxiproxy_stream], CommitmentConfig::finalized(), diff --git a/examples/stream_blocks_mainnet_channels.rs b/examples/stream_blocks_mainnet_channels.rs deleted file mode 100644 index ca9352d..0000000 --- a/examples/stream_blocks_mainnet_channels.rs +++ /dev/null @@ -1,59 +0,0 @@ -use geyser_grpc_connector::experimental::grpcmultiplex_fastestwins_channels::{ - create_multiplex, GrpcSourceConfig, -}; -/// Deprecated task/channel-based implementation of the multiplexer - use the stream-based one instead -use log::info; -use solana_sdk::commitment_config::CommitmentConfig; -use tokio::sync::broadcast::Receiver; -use tokio::time::{sleep, Duration}; -use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; - -fn start_example_consumer(blocks_notifier: Receiver>) { - tokio::spawn(async move { - let mut blocks_notifier = blocks_notifier; - loop { - let block = blocks_notifier.recv().await.unwrap(); - info!( - "received block #{} with {} txs", - block.slot, - block.transactions.len() - ); - } - }); -} - -#[tokio::main] -pub async fn main() { - // RUST_LOG=info,stream_blocks_mainnet_channels=debug,grpcmultiplex_fastestwins_channels=debug - tracing_subscriber::fmt::init(); - // console_subscriber::init(); - - // mango validator (mainnet) - let grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string(); - // via toxiproxy - let grpc_addr_mainnet_triton_toxi = "http://127.0.0.1:10001".to_string(); - // ams81 (mainnet) - let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000".to_string(); - // testnet - NOTE: this connection has terrible lags (almost 5 minutes) - // let grpc_addr = "http://147.28.169.13:10000".to_string(); - - let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(1000); - - let green_config = GrpcSourceConfig::new("triton".to_string(), grpc_addr_mainnet_triton, None); - let blue_config = - GrpcSourceConfig::new("mangoams81".to_string(), grpc_addr_mainnet_ams81, None); - let toxiproxy_config = - GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None); - - create_multiplex( - vec![green_config, blue_config, toxiproxy_config], - CommitmentConfig::finalized(), - block_sx, - ) - .await; - - start_example_consumer(blocks_notifier); - - // "infinite" sleep - sleep(Duration::from_secs(1800)).await; -} diff --git a/src/experimental/grpcmultiplex_fastestwins_channels.rs b/src/experimental/grpcmultiplex_fastestwins_channels.rs deleted file mode 100644 index fdac894..0000000 --- a/src/experimental/grpcmultiplex_fastestwins_channels.rs +++ /dev/null @@ -1,238 +0,0 @@ -use anyhow::Context; -use async_stream::stream; -use futures::{pin_mut, Stream, StreamExt}; -use itertools::Itertools; -use log::{debug, info, warn}; -use solana_sdk::clock::Slot; -use solana_sdk::commitment_config::CommitmentConfig; -/// Deprecated task/channel-based implementation of the multiplexer - use the stream-based one instead -use std::collections::HashMap; -use std::ops::Add; - -use merge_streams::MergeStreams; -use tokio::select; -use tokio::sync::broadcast::Sender; -use tokio::time::{sleep_until, Duration, Instant}; -use yellowstone_grpc_client::GeyserGrpcClient; -use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::{ - SubscribeRequestFilterBlocks, SubscribeUpdate, SubscribeUpdateBlock, -}; -use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; - -// use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block}; -// use solana_lite_rpc_core::AnyhowJoinHandle; -// use solana_lite_rpc_core::structures::produced_block::ProducedBlock; - -// TODO map SubscribeUpdateBlock -pub async fn create_multiplex( - grpc_sources: Vec, - commitment_config: CommitmentConfig, - block_sx: Sender>, -) -> tokio::task::JoinHandle> { - assert!( - commitment_config == CommitmentConfig::confirmed() - || commitment_config == CommitmentConfig::finalized(), - "Only CONFIRMED and FINALIZED is supported" - ); - // note: PROCESSED blocks are not sequential in presense of forks; this will break the logic - - if grpc_sources.is_empty() { - panic!("Must have at least one source"); - } - - tokio::spawn(async move { - info!( - "Starting multiplexer with {} sources: {}", - grpc_sources.len(), - grpc_sources - .iter() - .map(|source| source.label.clone()) - .join(", ") - ); - - let mut streams = vec![]; - for grpc_source in grpc_sources { - // note: stream never terminates - let stream = create_geyser_reconnecting_stream(grpc_source.clone(), commitment_config) - .await - .map(|update| { - // TODO wrap in new struct and add the source label - update - }); - streams.push(stream); - } - - let merged_streams = streams.merge(); - pin_mut!(merged_streams); - let mut merged_futures = std::pin::pin!(merged_streams.next()); - - let mut current_slot: Slot = 0; - - loop { - let block_cmd = select! { - message = &mut merged_futures => { - match message { - Some(message) => { - map_filter_block_message(current_slot, message, commitment_config) - } - None => { - panic!("source stream is not supposed to terminate"); - } - } - } - }; - - match block_cmd { - BlockCmd::ForwardBlock(block) => { - current_slot = block.slot; - block_sx.send(block).context("send block to downstream")?; - } - BlockCmd::DiscardBlockBehindTip(slot) => { - debug!(". discarding redundant block #{}", slot); - } - BlockCmd::SkipMessage => { - debug!(". skipping this message by type"); - } - } - } - }) -} - -// look Ma, no Clone! -#[derive(Debug)] -enum BlockCmd { - ForwardBlock(Box), - DiscardBlockBehindTip(Slot), - // skip geyser messages which are not block related updates - SkipMessage, -} - -fn map_filter_block_message( - current_slot: Slot, - update_message: SubscribeUpdate, - _commitment_config: CommitmentConfig, -) -> BlockCmd { - if let Some(UpdateOneof::Block(update_block_message)) = update_message.update_oneof { - if update_block_message.slot <= current_slot && current_slot != 0 { - // no progress - skip this - return BlockCmd::DiscardBlockBehindTip(update_block_message.slot); - } - - // expensive - // let produced_block = map_produced_block(update_block_message, commitment_config); - - BlockCmd::ForwardBlock(Box::new(update_block_message)) - } else { - BlockCmd::SkipMessage - } -} - -#[derive(Clone, Debug)] -pub struct GrpcSourceConfig { - // symbolic name used in logs - label: String, - grpc_addr: String, - grpc_x_token: Option, - tls_config: Option, -} - -impl GrpcSourceConfig { - pub fn new(label: String, grpc_addr: String, grpc_x_token: Option) -> Self { - Self { - label, - grpc_addr, - grpc_x_token, - tls_config: None, - } - } -} - -// TODO use GrpcSource -// note: stream never terminates -async fn create_geyser_reconnecting_stream( - grpc_source: GrpcSourceConfig, - commitment_config: CommitmentConfig, -) -> impl Stream { - // solana_sdk -> yellowstone - let commitment_level = match commitment_config.commitment { - solana_sdk::commitment_config::CommitmentLevel::Confirmed => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed - } - solana_sdk::commitment_config::CommitmentLevel::Finalized => { - yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized - } - _ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"), - }; - - let label = grpc_source.label.clone(); - stream! { - let mut throttle_barrier = Instant::now(); - 'reconnect_loop: loop { - sleep_until(throttle_barrier).await; - throttle_barrier = Instant::now().add(Duration::from_millis(1000)); - - let connect_result = GeyserGrpcClient::connect_with_timeout( - grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(), - Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await; - - let mut client = match connect_result { - Ok(connected_client) => connected_client, - Err(geyser_grpc_client_error) => { - // TODO identify non-recoverable errors and cancel stream - warn!("Connect failed on {} - retrying: {:?}", label, geyser_grpc_client_error); - continue 'reconnect_loop; - } - }; - - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "client".to_string(), - SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - - let subscribe_result = client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - blocks_subs, - Default::default(), - Some(commitment_level), - Default::default(), - None, - ).await; - - let geyser_stream = match subscribe_result { - Ok(subscribed_stream) => subscribed_stream, - Err(geyser_grpc_client_error) => { - // TODO identify non-recoverable errors and cancel stream - warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_grpc_client_error); - continue 'reconnect_loop; - } - }; - - for await update_message in geyser_stream { - match update_message { - Ok(update_message) => { - info!(">message on {}", label); - yield update_message; - } - Err(tonic_status) => { - // TODO identify non-recoverable errors and cancel stream - warn!("Receive error on {} - retrying: {:?}", label, tonic_status); - continue 'reconnect_loop; - } - } - } // -- production loop - - warn!("stream consumer loop {} terminated", label); - } // -- main loop - } // -- stream! -} diff --git a/src/experimental/mod.rs b/src/experimental/mod.rs index f4edc84..cda3fb8 100644 --- a/src/experimental/mod.rs +++ b/src/experimental/mod.rs @@ -1,2 +1 @@ -pub mod grpcmultiplex_fastestwins_channels; pub mod mock_literpc_core; diff --git a/src/grpcmultiplex_fastestwins.rs b/src/grpcmultiplex_fastestwins.rs index 3cd6a01..6295d8e 100644 --- a/src/grpcmultiplex_fastestwins.rs +++ b/src/grpcmultiplex_fastestwins.rs @@ -1,7 +1,5 @@ -use crate::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcSourceConfig}; use async_stream::stream; use futures::Stream; -use itertools::Itertools; use log::{debug, info}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig;