diff --git a/Cargo.lock b/Cargo.lock index b50d1f7..6393336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -737,6 +737,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "core-foundation" version = "0.9.4" @@ -914,6 +920,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.9.0" @@ -1226,6 +1245,7 @@ dependencies = [ "async-stream", "base64 0.21.5", "bincode", + "derive_more", "futures", "itertools", "log", @@ -2742,15 +2762,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "signal-hook-registry" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" -dependencies = [ - "libc", -] - [[package]] name = "signature" version = "1.6.4" @@ -3506,9 +3517,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", "pin-project-lite", - "signal-hook-registry", "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", diff --git a/Cargo.toml b/Cargo.toml index d4777a3..2ed8ede 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ yellowstone-grpc-proto = "1.11.0" solana-sdk = "~1.16.17" async-stream = "0.3.5" -tokio = { version = "1.28.2" , features = ["full"] } +tokio = { version = "1.28" , features = ["rt"] } futures = "0.3.28" anyhow = "1.0.70" log = "0.4.17" @@ -20,6 +20,7 @@ prometheus = "0.13.3" itertools = "0.10.5" base64 = "0.21.5" bincode = "1.3.3" +derive_more = "0.99.17" #[dev-dependencies] #solana-test-validator = "~1.16.17" diff --git a/examples/drain_to_tip_pattern.rs b/examples/drain_to_tip_pattern.rs new file mode 100644 index 0000000..99ceee7 --- /dev/null +++ b/examples/drain_to_tip_pattern.rs @@ -0,0 +1,126 @@ +/// Experiments with the drain-to-tip pattern used for GRPC multiplexing. +/// +use std::fmt::Display; +use derive_more::Display; +use futures::StreamExt; + +use log::{debug, error, info, warn}; +use serde::Serializer; +use tokio::select; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::time::{Duration, sleep}; + +#[derive(Debug, Clone, Display)] +struct Message { + slot: u64, +} + +impl Message { + fn new(slot: u64) -> Self { + Message { slot } + } +} + +#[tokio::main] +async fn main() { + // RUST_LOG=info,stream_via_grpc=debug,drain_to_tip_pattern=debug + tracing_subscriber::fmt::init(); + + let (tx, rx) = tokio::sync::broadcast::channel::(1000); + let (tx_tip, _) = tokio::sync::watch::channel::(Message::new(0)); + + start_progressor(rx, tx_tip.subscribe()).await; + + send_stream(tx.clone()).await; + + // move tip; current tip is 3; next offered slot is 4 + info!("==> force tip to 6 - expect progressor to unblock and offer 7"); + tx_tip.send(Message::new(6)).unwrap(); + + info!("Blocking main thread for some time to allow the system to operate..."); + sleep(tokio::time::Duration::from_secs(4)).await; + info!("Num broadcast subscribers: {}", tx_tip.receiver_count()); + + info!("Shutting down...."); + drop(tx_tip); + sleep(Duration::from_secs(1)).await; + info!("Shutdown completed."); +} + + +// this service is dedicated to one source channel which produces a monotonic stream of messages qualified by slot number +// service maintains a tip variable which is updated by different part of system +// service response to tip changes by blocking until the message received from stream has slot number greater than the tip +// service "offers" this message to the rest of the system +async fn start_progressor(mut blocks_notifier: Receiver, mut rx_tip: tokio::sync::watch::Receiver) { + info!("Started progressor"); + tokio::spawn(async move { + let mut local_tip = Message::new(3); + // block after tip offered by this stream + // TODO: block_after_tip is only valid/useful if greater than tip + let mut highest_block = Message::new(0); + + 'main_loop: loop { + select! { + result = rx_tip.changed() => { + if result.is_err() { + debug!("Tip variable closed"); + break 'main_loop; + } + local_tip = rx_tip.borrow_and_update().clone(); + info!("++> tip changed to {}", local_tip); + if local_tip.slot <= highest_block.slot { + info!("!! next offered slot is invalid: {} <= {}", highest_block.slot, local_tip); + } + // slow down in case of loop + // sleep(Duration::from_millis(100)).await; + } + + // here goes the strategy: either we get a new block OR a timeout + recv_result = blocks_notifier.recv(), if !(highest_block.slot > local_tip.slot) => { + debug!("!! block_after_tip.slot > local_tip.slot: {} > {}", highest_block.slot, local_tip.slot); + match recv_result { + Ok(msg) => { + info!("=> recv: {}", msg); + if msg.slot > local_tip.slot { + info!("==> offer next slot ({} -> {})", local_tip, msg.slot); + highest_block = msg; + // offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap(); + // this thread will sleep and not issue any recvs until we get tip.changed signal + continue 'main_loop; + } + } + Err(e) => { + // TODO what to do? + error!("Error receiving block: {}", e); + break 'main_loop; + } + } + } + } + } // -- main loop + + info!("Shutting down progressor."); + }); +} + + +async fn send_stream(message_channel: Sender) { + + // tip is 3 + + // drain 0 to 3; offer 4, then block + for i in 0..10 { + info!("> sending {} (queue size={})", i, message_channel.len()); + message_channel.send(Message::new(i)).unwrap(); + sleep(Duration::from_millis(100)).await; + } + + assert_eq!(message_channel.len(), 5); + +} + + + + + diff --git a/examples/grpcmultiplex_perfectseq.rs b/examples/grpcmultiplex_perfectseq.rs new file mode 100644 index 0000000..a34c714 --- /dev/null +++ b/examples/grpcmultiplex_perfectseq.rs @@ -0,0 +1,392 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt::{Display, Formatter}; +use std::ops::Deref; +use std::path::PathBuf; +use std::sync::Arc; +use anyhow::{bail, Context}; +use futures::StreamExt; +use itertools::{ExactlyOneError, Itertools}; + +use log::{debug, error, info, warn}; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use tokio::select; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::broadcast::error::{RecvError, TryRecvError}; +use tokio::sync::RwLock; +use tokio::time::{sleep, Duration, timeout}; +use yellowstone_grpc_client::GeyserGrpcClient; +use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocksMeta, SubscribeUpdateBlockMeta}; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; + +pub const GRPC_VERSION: &str = "1.16.1"; + +#[tokio::main] +// #[tokio::main(flavor = "multi_thread", worker_threads = 16)] +pub async fn main() { + // RUST_LOG=info,grpcmultiplex_perfectseq=debug,drain_to_tip_pattern=debug + tracing_subscriber::fmt::init(); + + // mango validator (mainnet) + let _grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string(); + // ams81 (mainnet) + let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000".to_string(); + // testnet - NOTE: this connection has terrible lags (almost 5 minutes) + // let grpc_addr = "http://147.28.169.13:10000".to_string(); + + // let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000); + let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::(1000); + let (block_sx_blue, blocks_notifier_blue) = tokio::sync::broadcast::channel(1000); + + // TODO ship ProducedBlock + let (sx_multi, mut rx_multi) = tokio::sync::broadcast::channel::(1000); + + let grpc_x_token = None; + let block_confirmed_task_green = create_blockmeta_processing_task( + _grpc_addr_mainnet_triton.clone(), + grpc_x_token.clone(), + block_sx_green.clone(), + CommitmentLevel::Confirmed, + ); + + let block_confirmed_task_blue = create_blockmeta_processing_task( + grpc_addr_mainnet_ams81.clone(), + grpc_x_token.clone(), + block_sx_blue.clone(), + CommitmentLevel::Confirmed, + ); + + + let (tx_tip, _) = tokio::sync::watch::channel::(0); + + let (offer_block_sender, mut offer_block_notifier) = tokio::sync::mpsc::channel::(100); + + // producers + start_progressor("green".to_string(), blocks_notifier_green, tx_tip.subscribe(), offer_block_sender.clone()).await; + start_progressor("blue".to_string(), blocks_notifier_blue, tx_tip.subscribe(), offer_block_sender.clone()).await; + + // merge task + // collect the offered slots from the two channels + tokio::spawn(async move { + // need to wait until channels reached slot beyond tip + // tokio::time::sleep(Duration::from_secs(14)).await; + + // see also start procedure! + let mut current_tip = 0; + let mut blocks_offered = Vec::::new(); + 'main_loop: loop { + + // note: we abuse the timeout mechanism to collect some blocks + let timeout_secs = if current_tip == 0 { 20 } else { 10 }; + + let msg_or_timeout = timeout(Duration::from_secs(timeout_secs), offer_block_notifier.recv()).await; + info!("- current_tip={}", current_tip); + + match msg_or_timeout { + Ok(Some(offer_block_msg)) => { + // collect the offered slots from the channels + if let OfferBlockMsg::NextSlot(label, block_offered) = offer_block_msg { + info!("<< offered slot from {}: {:?}", label, block_offered); + + // TOOD use .parent instead + if block_offered.parent_slot == current_tip { + current_tip = block_offered.slot; + info!("<< take block from {} as new tip {}", label, current_tip); + assert_ne!(current_tip, 0, "must not see uninitialized tip"); + + emit_block_on_multiplex_output_channel(&sx_multi, current_tip); + tx_tip.send(current_tip).unwrap(); + blocks_offered.clear(); + continue 'main_loop; + } else { + // keep the block for later + blocks_offered.push(block_offered); + continue 'main_loop; + } + + + } + // TODO handle else + } + Ok(None) => { + // TODO double-check + // channel closed + info!("--> channel closed"); + break; + } + Err(_elapsed) => { + // timeout + info!("--> timeout: got these slots: {:?}", blocks_offered); + + // we abuse timeout feature to wait for some blocks to arrive to select the "best" one + if current_tip == 0 { + let start_slot = blocks_offered.iter().max_by(|lhs,rhs| lhs.slot.cmp(&rhs.slot)).expect("need at least one slot to start"); + current_tip = start_slot.slot; + assert_ne!(current_tip, 0, "must not see uninitialized tip"); + + emit_block_on_multiplex_output_channel(&sx_multi, current_tip); + tx_tip.send(current_tip).unwrap(); + info!("--> initializing with tip {}", current_tip); + blocks_offered.clear(); + continue 'main_loop; + } + + match blocks_offered.iter().filter(|b| b.parent_slot == current_tip).exactly_one() { + Ok(found_next) => { + current_tip = found_next.slot; + assert_ne!(current_tip, 0, "must not see uninitialized tip"); + tx_tip.send(current_tip).unwrap(); + info!("--> progressing tip to {}", current_tip); + blocks_offered.clear(); + } + Err(missed) => { + warn!("--> no slots offered - SHOULD ABORT - no hope to progress"); + } + } + + sleep(Duration::from_millis(500)).await; + } + } + } // -- recv loop + + info!("Shutting down merge task."); + }); + + + tokio::spawn(async move { + let mut rx_multi = rx_multi; + let mut last_slot = 0; + loop { + let slot = rx_multi.recv().await.unwrap(); + assert_ne!(slot, 0, "must not see empty slot"); + info!("==> multiplexed slot: {}", slot); + if slot - last_slot > 1 && last_slot != 0 { + warn!("==> gap: {} -> {}", last_slot, slot); + } + last_slot = slot; + } + }); + + + // "infinite" sleep + sleep(Duration::from_secs(1800)).await; + + // TODO proper shutdown + info!("Shutdown completed."); + +} + +fn emit_block_on_multiplex_output_channel(sx_multi: &Sender, mut current_tip: u64) { + sx_multi.send(current_tip).unwrap(); +} + +#[derive(Clone, Debug)] +struct BlockRef { + pub slot: Slot, + pub parent_slot: Slot, +} + +impl From for BlockRef { + fn from(block: SimpleBlockMeta) -> Self { + BlockRef { + slot: block.slot, + parent_slot: block.parent_slot, + } + } +} + +#[derive(Debug)] +enum OfferBlockMsg { + NextSlot(String, BlockRef), +} + +async fn start_progressor(label: String, blocks_notifier: Receiver, mut rx_tip: tokio::sync::watch::Receiver, + offer_block_sender: tokio::sync::mpsc::Sender) { + tokio::spawn(async move { + // TODO is .resubscribe what we want? + let mut blocks_notifier = blocks_notifier.resubscribe(); + // for test only + // let start_slot = blocks_notifier.recv().await.unwrap().slot; + + // local copy of tip + let mut local_tip = 0; + + // block after tip offered by this stream + // TODO: block_after_tip is only valid/useful if greater than tip + let mut highest_block: BlockRef = BlockRef { + slot: 0, + parent_slot: 0, + }; + 'main_loop: loop { + select! { + result = rx_tip.changed() => { + if result.is_err() { + debug!("Tip variable closed for {}", label); + break 'main_loop; + } + local_tip = rx_tip.borrow_and_update().clone(); + info!("++> {} tip changed to {}", label, local_tip); + // TODO update local tip + } + recv_result = blocks_notifier.recv(), if !(highest_block.slot > local_tip) => { + match recv_result { + Ok(block) => { + info!("=> recv on {}: {}",label, format_block(&block)); + if block.slot > local_tip { + info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip); + highest_block = BlockRef::from(block); + offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), highest_block.clone())).await.unwrap(); + // this thread will sleep and not issue any recvs until we get tip.changed signal + continue 'main_loop; + } + } + Err(e) => { + // TODO what to do? + error!("Error receiving block: {}", e); + break 'main_loop; + } + } + } + } + } + }); +} + +fn format_block(block: &SimpleBlockMeta) -> String { + format!("{:?}@{} (-> {})", block.slot, block.commitment_config.commitment, block.parent_slot) +} + + +fn start_monkey_broadcast(capacity: usize) -> (Sender, Receiver) { + let (tx, monkey_upstream) = tokio::sync::broadcast::channel::(1024); + let (monkey_downstream, rx) = tokio::sync::broadcast::channel::(capacity); + + tokio::spawn(async move { + let mut monkey_upstream = monkey_upstream; + 'recv_loop: for counter in 1.. { + let value = match monkey_upstream.recv().await { + Ok(msg) => { + msg + } + Err(RecvError::Closed) => { + return (); + } + Err(RecvError::Lagged(_)) => { + continue; + } + }; + if let Ok(val) = monkey_upstream.recv().await { + val + } else { + return (); + }; + // info!("forwarding: {}", value); + // failes if there are no receivers + + if counter % 3 == 0 { + debug!("% delay value (monkey)"); + tokio::time::sleep(Duration::from_millis(700)).await; + } + if counter % 5 == 0 { + debug!("% drop value (monkey)"); + continue 'recv_loop; + } + if counter % 23 == 0 { + debug!("% system outage + reboot (monkey)"); + tokio::time::sleep(Duration::from_secs(5)).await; + } + + let send_result = monkey_downstream.send(value); + match send_result { + Ok(_) => { + debug!("% forwarded"); + } + Err(_) => panic!("Should never happen") + } + } + }); + + (tx, rx) +} + +#[derive(Debug, Clone)] +struct SimpleBlockMeta { + slot: Slot, + parent_slot: Slot, + commitment_config: CommitmentConfig, +} + + +fn create_blockmeta_processing_task( + grpc_addr: String, + grpc_x_token: Option, + block_sx: Sender, + commitment_level: CommitmentLevel, +) -> tokio::task::JoinHandle> { + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "client".to_string(), + SubscribeRequestFilterBlocksMeta {}, + ); + + let commitment_config = match commitment_level { + CommitmentLevel::Confirmed => CommitmentConfig::confirmed(), + CommitmentLevel::Finalized => CommitmentConfig::finalized(), + CommitmentLevel::Processed => CommitmentConfig::processed(), + }; + + let foo = tokio::spawn(async move { + // connect to grpc + let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?; + let mut stream = client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + Default::default(), + blocks_subs, + Some(commitment_level), + Default::default(), + None, + ) + .await?; + + while let Some(message) = stream.next().await { + let message = message?; + + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + UpdateOneof::BlockMeta(block_meta) => { + let block_meta = process_blockmeta(block_meta, commitment_config); + block_sx + .send(block_meta) + .context("Grpc failed to send a block")?; + } + UpdateOneof::Ping(_) => { + log::trace!("GRPC Ping"); + } + u => { + bail!("Unexpected update: {u:?}"); + } + }; + } + bail!("geyser slot stream ended"); + }); + + foo +} + + +fn process_blockmeta(block_meta: SubscribeUpdateBlockMeta, commitment_config: CommitmentConfig) -> SimpleBlockMeta { + SimpleBlockMeta { + slot: block_meta.slot, + parent_slot: block_meta.parent_slot, + commitment_config: commitment_config, + } +} + +