deleted task/channel version

(moved to hacking-task-channel)
This commit is contained in:
GroovieGermanikus 2023-12-15 11:45:00 +01:00
parent 6081426f74
commit a7f957a528
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
8 changed files with 9 additions and 840 deletions

View File

@ -8,18 +8,3 @@ Multiple _Futures_ are executed in parallel and the first result is returned.
No __guarantees__ are made about if the messages are continuous or not.
## _Fastest Wins Strategy_ using task/channel (experimental)
(see `grpcmultiplex_fastestwins_channels.rs`)
This implementation is built with _Tokio Tasks and Channels_ instead of futures.
It is simpler and more predictable than the _Futures_ implementation.
It uses a _push model_ rather than a _pull model_, i.e. the _task_ pushes the result to the _channel_ while the _Futures_ implementation pulls the result from the chain of _Futures_.
## _Perfect Seq Strategy_ (experimental)
(see `grpcmultiplex_perfectseq.rs`)
This strategy **guarantees** that the messages are **continuous** and in the **correct order**.
The _continuous_ property is defined by linking a _Block_ with previous _Block_ using `block.parent_slot` and `block.parent_hash`.
The **disadvantage** is that it will not produce messages if a _Block_ is missing.
It is limited to the commitment levels __confirmed__ and __finalized__. For __processed__ level there is no sequence due to the potential presence of forks with form a tree.

View File

@ -1,115 +0,0 @@
use derive_more::Display;
use log::{debug, error, info};
use tokio::select;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::time::{sleep, Duration};
#[derive(Debug, Clone, Display)]
struct Message {
slot: u64,
}
impl Message {
fn new(slot: u64) -> Self {
Message { slot }
}
}
#[tokio::main]
async fn main() {
// RUST_LOG=info,stream_via_grpc=debug,drain_to_tip_pattern=debug
tracing_subscriber::fmt::init();
let (tx, rx) = tokio::sync::broadcast::channel::<Message>(1000);
let (tx_tip, _) = tokio::sync::watch::channel::<Message>(Message::new(0));
start_progressor(rx, tx_tip.subscribe()).await;
send_stream(tx.clone()).await;
// move tip; current tip is 3; next offered slot is 4
info!("==> force tip to 6 - expect progressor to unblock and offer 7");
tx_tip.send(Message::new(6)).unwrap();
info!("Blocking main thread for some time to allow the system to operate...");
sleep(tokio::time::Duration::from_secs(4)).await;
info!("Num broadcast subscribers: {}", tx_tip.receiver_count());
info!("Shutting down....");
drop(tx_tip);
sleep(Duration::from_secs(1)).await;
info!("Shutdown completed.");
}
// this service is dedicated to one source channel which produces a monotonic stream of messages qualified by slot number
// service maintains a tip variable which is updated by different part of system
// service response to tip changes by blocking until the message received from stream has slot number greater than the tip
// service "offers" this message to the rest of the system
async fn start_progressor(
mut blocks_notifier: Receiver<Message>,
mut rx_tip: tokio::sync::watch::Receiver<Message>,
) {
info!("Started progressor");
tokio::spawn(async move {
let mut local_tip = Message::new(3);
// block after tip offered by this stream
// TODO: block_after_tip is only valid/useful if greater than tip
let mut highest_block = Message::new(0);
'main_loop: loop {
select! {
result = rx_tip.changed() => {
if result.is_err() {
debug!("Tip variable closed");
break 'main_loop;
}
local_tip = rx_tip.borrow_and_update().clone();
info!("++> tip changed to {}", local_tip);
if local_tip.slot <= highest_block.slot {
info!("!! next offered slot is invalid: {} <= {}", highest_block.slot, local_tip);
}
// slow down in case of loop
// sleep(Duration::from_millis(100)).await;
}
// here goes the strategy: either we get a new block OR a timeout
recv_result = blocks_notifier.recv(), if highest_block.slot <= local_tip.slot => {
debug!("!! block_after_tip.slot > local_tip.slot: {} > {}", highest_block.slot, local_tip.slot);
match recv_result {
Ok(msg) => {
info!("=> recv: {}", msg);
if msg.slot > local_tip.slot {
info!("==> offer next slot ({} -> {})", local_tip, msg.slot);
highest_block = msg;
// offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap();
// this thread will sleep and not issue any recvs until we get tip.changed signal
continue 'main_loop;
}
}
Err(e) => {
// TODO what to do?
error!("Error receiving block: {}", e);
break 'main_loop;
}
}
}
}
} // -- main loop
info!("Shutting down progressor.");
});
}
async fn send_stream(message_channel: Sender<Message>) {
// tip is 3
// drain 0 to 3; offer 4, then block
for i in 0..10 {
info!("> sending {} (queue size={})", i, message_channel.len());
message_channel.send(Message::new(i)).unwrap();
sleep(Duration::from_millis(100)).await;
}
assert_eq!(message_channel.len(), 5);
}

View File

@ -1,406 +0,0 @@
use anyhow::{bail, Context};
use futures::StreamExt;
use itertools::Itertools;
use std::collections::HashMap;
use log::{debug, error, info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::select;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::time::{sleep, timeout, Duration};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequestFilterBlocksMeta, SubscribeUpdateBlockMeta,
};
pub const GRPC_VERSION: &str = "1.16.1";
#[tokio::main]
// #[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() {
// RUST_LOG=info,grpcmultiplex_perfectseq=debug,drain_to_tip_pattern=debug
tracing_subscriber::fmt::init();
// mango validator (mainnet)
let _grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string();
// ams81 (mainnet)
let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000".to_string();
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
// let grpc_addr = "http://147.28.169.13:10000".to_string();
// let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000);
let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::<SimpleBlockMeta>(1000);
let (block_sx_blue, blocks_notifier_blue) = tokio::sync::broadcast::channel(1000);
// TODO ship ProducedBlock
let (sx_multi, rx_multi) = tokio::sync::broadcast::channel::<Slot>(1000);
let grpc_x_token = None;
let _block_confirmed_task_green = create_blockmeta_processing_task(
_grpc_addr_mainnet_triton.clone(),
grpc_x_token.clone(),
block_sx_green.clone(),
CommitmentLevel::Confirmed,
);
let _block_confirmed_task_blue = create_blockmeta_processing_task(
grpc_addr_mainnet_ams81.clone(),
grpc_x_token.clone(),
block_sx_blue.clone(),
CommitmentLevel::Confirmed,
);
let (tx_tip, _) = tokio::sync::watch::channel::<Slot>(0);
let (offer_block_sender, mut offer_block_notifier) =
tokio::sync::mpsc::channel::<OfferBlockMsg>(100);
// producers
start_progressor(
"green".to_string(),
blocks_notifier_green,
tx_tip.subscribe(),
offer_block_sender.clone(),
)
.await;
start_progressor(
"blue".to_string(),
blocks_notifier_blue,
tx_tip.subscribe(),
offer_block_sender.clone(),
)
.await;
// merge task
// collect the offered slots from the two channels
tokio::spawn(async move {
// need to wait until channels reached slot beyond tip
// tokio::time::sleep(Duration::from_secs(14)).await;
// see also start procedure!
let mut current_tip = 0;
let mut blocks_offered = Vec::<BlockRef>::new();
'main_loop: loop {
// note: we abuse the timeout mechanism to collect some blocks
let timeout_secs = if current_tip == 0 { 20 } else { 10 };
let msg_or_timeout = timeout(
Duration::from_secs(timeout_secs),
offer_block_notifier.recv(),
)
.await;
info!("- current_tip={}", current_tip);
match msg_or_timeout {
Ok(Some(offer_block_msg)) => {
// collect the offered slots from the channels
match offer_block_msg {
OfferBlockMsg::NextSlot(label, block_offered) => {
info!("<< offered slot from {}: {:?}", label, block_offered);
// TOOD use .parent instead
if block_offered.parent_slot == current_tip {
current_tip = block_offered.slot;
info!("<< take block from {} as new tip {}", label, current_tip);
assert_ne!(current_tip, 0, "must not see uninitialized tip");
emit_block_on_multiplex_output_channel(&sx_multi, current_tip);
tx_tip.send(current_tip).unwrap();
blocks_offered.clear();
continue 'main_loop;
} else {
// keep the block for later
blocks_offered.push(block_offered);
continue 'main_loop;
}
}
}
// TODO handle else
}
Ok(None) => {
// TODO double-check
// channel closed
info!("--> channel closed");
break;
}
Err(_elapsed) => {
// timeout
info!("--> timeout: got these slots: {:?}", blocks_offered);
// we abuse timeout feature to wait for some blocks to arrive to select the "best" one
if current_tip == 0 {
let start_slot = blocks_offered
.iter()
.max_by(|lhs, rhs| lhs.slot.cmp(&rhs.slot))
.expect("need at least one slot to start");
current_tip = start_slot.slot;
assert_ne!(current_tip, 0, "must not see uninitialized tip");
emit_block_on_multiplex_output_channel(&sx_multi, current_tip);
tx_tip.send(current_tip).unwrap();
info!("--> initializing with tip {}", current_tip);
blocks_offered.clear();
continue 'main_loop;
}
match blocks_offered
.iter()
.filter(|b| b.parent_slot == current_tip)
.exactly_one()
{
Ok(found_next) => {
current_tip = found_next.slot;
assert_ne!(current_tip, 0, "must not see uninitialized tip");
tx_tip.send(current_tip).unwrap();
info!("--> progressing tip to {}", current_tip);
blocks_offered.clear();
}
Err(_missed) => {
warn!("--> no slots offered - SHOULD ABORT - no hope to progress");
}
}
sleep(Duration::from_millis(500)).await;
}
}
} // -- recv loop
info!("Shutting down merge task.");
});
tokio::spawn(async move {
let mut rx_multi = rx_multi;
let mut last_slot = 0;
loop {
let slot = rx_multi.recv().await.unwrap();
assert_ne!(slot, 0, "must not see empty slot");
info!("==> multiplexed slot: {}", slot);
if slot - last_slot > 1 && last_slot != 0 {
warn!("==> gap: {} -> {}", last_slot, slot);
}
last_slot = slot;
}
});
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
// TODO proper shutdown
info!("Shutdown completed.");
}
fn emit_block_on_multiplex_output_channel(sx_multi: &Sender<Slot>, current_tip: u64) {
sx_multi.send(current_tip).unwrap();
}
#[derive(Clone, Debug)]
struct BlockRef {
pub slot: Slot,
pub parent_slot: Slot,
}
impl From<SimpleBlockMeta> for BlockRef {
fn from(block: SimpleBlockMeta) -> Self {
BlockRef {
slot: block.slot,
parent_slot: block.parent_slot,
}
}
}
#[derive(Debug)]
enum OfferBlockMsg {
NextSlot(String, BlockRef),
}
async fn start_progressor(
label: String,
blocks_notifier: Receiver<SimpleBlockMeta>,
mut rx_tip: tokio::sync::watch::Receiver<Slot>,
offer_block_sender: tokio::sync::mpsc::Sender<OfferBlockMsg>,
) {
tokio::spawn(async move {
// TODO is .resubscribe what we want?
let mut blocks_notifier = blocks_notifier.resubscribe();
// for test only
// let start_slot = blocks_notifier.recv().await.unwrap().slot;
// local copy of tip
let mut local_tip = 0;
// block after tip offered by this stream
// TODO: block_after_tip is only valid/useful if greater than tip
let mut highest_block: BlockRef = BlockRef {
slot: 0,
parent_slot: 0,
};
'main_loop: loop {
select! {
result = rx_tip.changed() => {
if result.is_err() {
debug!("Tip variable closed for {}", label);
break 'main_loop;
}
local_tip = *rx_tip.borrow_and_update();
info!("++> {} tip changed to {}", label, local_tip);
// TODO update local tip
}
recv_result = blocks_notifier.recv(), if highest_block.slot <= local_tip => {
match recv_result {
Ok(block) => {
info!("=> recv on {}: {}",label, format_block(&block));
if block.slot > local_tip {
info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip);
highest_block = BlockRef::from(block);
offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), highest_block.clone())).await.unwrap();
// this thread will sleep and not issue any recvs until we get tip.changed signal
continue 'main_loop;
}
}
Err(e) => {
// TODO what to do?
error!("Error receiving block: {}", e);
break 'main_loop;
}
}
}
}
}
});
}
fn format_block(block: &SimpleBlockMeta) -> String {
format!(
"{:?}@{} (-> {})",
block.slot, block.commitment_config.commitment, block.parent_slot
)
}
fn start_monkey_broadcast<T: Clone + Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let (tx, monkey_upstream) = tokio::sync::broadcast::channel::<T>(1024);
let (monkey_downstream, rx) = tokio::sync::broadcast::channel::<T>(capacity);
tokio::spawn(async move {
let mut monkey_upstream = monkey_upstream;
'recv_loop: for counter in 1.. {
let value = match monkey_upstream.recv().await {
Ok(msg) => msg,
Err(RecvError::Closed) => {
return;
}
Err(RecvError::Lagged(_)) => {
continue;
}
};
if let Ok(val) = monkey_upstream.recv().await {
val
} else {
return;
};
// info!("forwarding: {}", value);
// failes if there are no receivers
if counter % 3 == 0 {
debug!("% delay value (monkey)");
tokio::time::sleep(Duration::from_millis(700)).await;
}
if counter % 5 == 0 {
debug!("% drop value (monkey)");
continue 'recv_loop;
}
if counter % 23 == 0 {
debug!("% system outage + reboot (monkey)");
tokio::time::sleep(Duration::from_secs(5)).await;
}
let send_result = monkey_downstream.send(value);
match send_result {
Ok(_) => {
debug!("% forwarded");
}
Err(_) => panic!("Should never happen"),
}
}
});
(tx, rx)
}
#[derive(Debug, Clone)]
struct SimpleBlockMeta {
slot: Slot,
parent_slot: Slot,
commitment_config: CommitmentConfig,
}
fn create_blockmeta_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: Sender<SimpleBlockMeta>,
commitment_level: CommitmentLevel,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
let mut blocks_subs = HashMap::new();
blocks_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
let commitment_config = match commitment_level {
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
CommitmentLevel::Processed => CommitmentConfig::processed(),
};
tokio::spawn(async move {
// connect to grpc
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?;
let mut stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Default::default(),
blocks_subs,
Some(commitment_level),
Default::default(),
None,
)
.await?;
while let Some(message) = stream.next().await {
let message = message?;
let Some(update) = message.update_oneof else {
continue;
};
match update {
UpdateOneof::BlockMeta(block_meta) => {
let block_meta = process_blockmeta(block_meta, commitment_config);
block_sx
.send(block_meta)
.context("Grpc failed to send a block")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
u => {
bail!("Unexpected update: {u:?}");
}
};
}
bail!("geyser slot stream ended");
})
}
fn process_blockmeta(
block_meta: SubscribeUpdateBlockMeta,
commitment_config: CommitmentConfig,
) -> SimpleBlockMeta {
SimpleBlockMeta {
slot: block_meta.slot,
parent_slot: block_meta.parent_slot,
commitment_config,
}
}

View File

@ -5,7 +5,9 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::pin::pin;
use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcSourceConfig};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GrpcSourceConfig,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
@ -56,9 +58,12 @@ pub async fn main() {
let toxiproxy_config =
GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None);
let green_stream = create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized());
let blue_stream = create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized());
let toxiproxy_stream = create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized());
let green_stream =
create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized());
let blue_stream =
create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized());
let toxiproxy_stream =
create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized());
let multiplex_stream = create_multiplex(
vec![green_stream, blue_stream, toxiproxy_stream],
CommitmentConfig::finalized(),

View File

@ -1,59 +0,0 @@
use geyser_grpc_connector::experimental::grpcmultiplex_fastestwins_channels::{
create_multiplex, GrpcSourceConfig,
};
/// Deprecated task/channel-based implementation of the multiplexer - use the stream-based one instead
use log::info;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::sync::broadcast::Receiver;
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;
fn start_example_consumer(blocks_notifier: Receiver<Box<SubscribeUpdateBlock>>) {
tokio::spawn(async move {
let mut blocks_notifier = blocks_notifier;
loop {
let block = blocks_notifier.recv().await.unwrap();
info!(
"received block #{} with {} txs",
block.slot,
block.transactions.len()
);
}
});
}
#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet_channels=debug,grpcmultiplex_fastestwins_channels=debug
tracing_subscriber::fmt::init();
// console_subscriber::init();
// mango validator (mainnet)
let grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string();
// via toxiproxy
let grpc_addr_mainnet_triton_toxi = "http://127.0.0.1:10001".to_string();
// ams81 (mainnet)
let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000".to_string();
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
// let grpc_addr = "http://147.28.169.13:10000".to_string();
let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(1000);
let green_config = GrpcSourceConfig::new("triton".to_string(), grpc_addr_mainnet_triton, None);
let blue_config =
GrpcSourceConfig::new("mangoams81".to_string(), grpc_addr_mainnet_ams81, None);
let toxiproxy_config =
GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None);
create_multiplex(
vec![green_config, blue_config, toxiproxy_config],
CommitmentConfig::finalized(),
block_sx,
)
.await;
start_example_consumer(blocks_notifier);
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}

View File

@ -1,238 +0,0 @@
use anyhow::Context;
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use itertools::Itertools;
use log::{debug, info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
/// Deprecated task/channel-based implementation of the multiplexer - use the stream-based one instead
use std::collections::HashMap;
use std::ops::Add;
use merge_streams::MergeStreams;
use tokio::select;
use tokio::sync::broadcast::Sender;
use tokio::time::{sleep_until, Duration, Instant};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequestFilterBlocks, SubscribeUpdate, SubscribeUpdateBlock,
};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
// use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block};
// use solana_lite_rpc_core::AnyhowJoinHandle;
// use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
// TODO map SubscribeUpdateBlock
pub async fn create_multiplex(
grpc_sources: Vec<GrpcSourceConfig>,
commitment_config: CommitmentConfig,
block_sx: Sender<Box<SubscribeUpdateBlock>>,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
assert!(
commitment_config == CommitmentConfig::confirmed()
|| commitment_config == CommitmentConfig::finalized(),
"Only CONFIRMED and FINALIZED is supported"
);
// note: PROCESSED blocks are not sequential in presense of forks; this will break the logic
if grpc_sources.is_empty() {
panic!("Must have at least one source");
}
tokio::spawn(async move {
info!(
"Starting multiplexer with {} sources: {}",
grpc_sources.len(),
grpc_sources
.iter()
.map(|source| source.label.clone())
.join(", ")
);
let mut streams = vec![];
for grpc_source in grpc_sources {
// note: stream never terminates
let stream = create_geyser_reconnecting_stream(grpc_source.clone(), commitment_config)
.await
.map(|update| {
// TODO wrap in new struct and add the source label
update
});
streams.push(stream);
}
let merged_streams = streams.merge();
pin_mut!(merged_streams);
let mut merged_futures = std::pin::pin!(merged_streams.next());
let mut current_slot: Slot = 0;
loop {
let block_cmd = select! {
message = &mut merged_futures => {
match message {
Some(message) => {
map_filter_block_message(current_slot, message, commitment_config)
}
None => {
panic!("source stream is not supposed to terminate");
}
}
}
};
match block_cmd {
BlockCmd::ForwardBlock(block) => {
current_slot = block.slot;
block_sx.send(block).context("send block to downstream")?;
}
BlockCmd::DiscardBlockBehindTip(slot) => {
debug!(". discarding redundant block #{}", slot);
}
BlockCmd::SkipMessage => {
debug!(". skipping this message by type");
}
}
}
})
}
// look Ma, no Clone!
#[derive(Debug)]
enum BlockCmd {
ForwardBlock(Box<SubscribeUpdateBlock>),
DiscardBlockBehindTip(Slot),
// skip geyser messages which are not block related updates
SkipMessage,
}
fn map_filter_block_message(
current_slot: Slot,
update_message: SubscribeUpdate,
_commitment_config: CommitmentConfig,
) -> BlockCmd {
if let Some(UpdateOneof::Block(update_block_message)) = update_message.update_oneof {
if update_block_message.slot <= current_slot && current_slot != 0 {
// no progress - skip this
return BlockCmd::DiscardBlockBehindTip(update_block_message.slot);
}
// expensive
// let produced_block = map_produced_block(update_block_message, commitment_config);
BlockCmd::ForwardBlock(Box::new(update_block_message))
} else {
BlockCmd::SkipMessage
}
}
#[derive(Clone, Debug)]
pub struct GrpcSourceConfig {
// symbolic name used in logs
label: String,
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
}
impl GrpcSourceConfig {
pub fn new(label: String, grpc_addr: String, grpc_x_token: Option<String>) -> Self {
Self {
label,
grpc_addr,
grpc_x_token,
tls_config: None,
}
}
}
// TODO use GrpcSource
// note: stream never terminates
async fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
commitment_config: CommitmentConfig,
) -> impl Stream<Item = SubscribeUpdate> {
// solana_sdk -> yellowstone
let commitment_level = match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Confirmed => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Confirmed
}
solana_sdk::commitment_config::CommitmentLevel::Finalized => {
yellowstone_grpc_proto::prelude::CommitmentLevel::Finalized
}
_ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"),
};
let label = grpc_source.label.clone();
stream! {
let mut throttle_barrier = Instant::now();
'reconnect_loop: loop {
sleep_until(throttle_barrier).await;
throttle_barrier = Instant::now().add(Duration::from_millis(1000));
let connect_result = GeyserGrpcClient::connect_with_timeout(
grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(),
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;
let mut client = match connect_result {
Ok(connected_client) => connected_client,
Err(geyser_grpc_client_error) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Connect failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
continue 'reconnect_loop;
}
};
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let subscribe_result = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
).await;
let geyser_stream = match subscribe_result {
Ok(subscribed_stream) => subscribed_stream,
Err(geyser_grpc_client_error) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Subscribe failed on {} - retrying: {:?}", label, geyser_grpc_client_error);
continue 'reconnect_loop;
}
};
for await update_message in geyser_stream {
match update_message {
Ok(update_message) => {
info!(">message on {}", label);
yield update_message;
}
Err(tonic_status) => {
// TODO identify non-recoverable errors and cancel stream
warn!("Receive error on {} - retrying: {:?}", label, tonic_status);
continue 'reconnect_loop;
}
}
} // -- production loop
warn!("stream consumer loop {} terminated", label);
} // -- main loop
} // -- stream!
}

View File

@ -1,2 +1 @@
pub mod grpcmultiplex_fastestwins_channels;
pub mod mock_literpc_core;

View File

@ -1,7 +1,5 @@
use crate::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcSourceConfig};
use async_stream::stream;
use futures::Stream;
use itertools::Itertools;
use log::{debug, info};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;