2024-02-07 06:51:22 -08:00
|
|
|
use crate::grpc_subscription::from_grpc_block_update;
|
|
|
|
use anyhow::{bail, Context};
|
2024-01-21 04:56:40 -08:00
|
|
|
use futures::{Stream, StreamExt};
|
2024-02-07 06:51:22 -08:00
|
|
|
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
|
|
|
|
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
|
|
|
|
use geyser_grpc_connector::{GeyserFilter, GrpcSourceConfig, Message};
|
|
|
|
use itertools::Itertools;
|
2024-01-08 05:21:05 -08:00
|
|
|
use log::{debug, info, trace, warn};
|
2024-02-07 06:51:22 -08:00
|
|
|
use merge_streams::MergeStreams;
|
2023-12-22 05:42:20 -08:00
|
|
|
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
|
|
|
|
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
|
|
|
|
use solana_lite_rpc_core::AnyhowJoinHandle;
|
|
|
|
use solana_sdk::clock::Slot;
|
|
|
|
use solana_sdk::commitment_config::CommitmentConfig;
|
2024-01-21 04:56:40 -08:00
|
|
|
use std::collections::{BTreeSet, HashMap, HashSet};
|
2023-12-22 05:42:20 -08:00
|
|
|
use std::time::Duration;
|
|
|
|
use tokio::sync::broadcast::Receiver;
|
2024-01-21 04:56:40 -08:00
|
|
|
use tokio::sync::mpsc::UnboundedSender;
|
2024-02-07 06:51:22 -08:00
|
|
|
use tokio::task::AbortHandle;
|
|
|
|
use tokio::time::sleep;
|
|
|
|
use tokio_stream::wrappers::ReceiverStream;
|
2023-12-22 05:42:20 -08:00
|
|
|
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
2024-01-17 06:17:49 -08:00
|
|
|
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
|
2023-12-22 05:42:20 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
/// connect to all sources provided using transparent autoconnection task
|
|
|
|
/// shutdown handling:
|
|
|
|
/// - task will shutdown of the receiver side of block_sender gets closed
|
|
|
|
/// - will also shutdown the grpc autoconnection task(s)
|
2024-01-22 23:17:48 -08:00
|
|
|
fn create_grpc_multiplex_processed_block_stream(
|
2024-01-21 04:56:40 -08:00
|
|
|
grpc_sources: &Vec<GrpcSourceConfig>,
|
2024-02-07 06:51:22 -08:00
|
|
|
block_sender: UnboundedSender<ProducedBlock>,
|
|
|
|
) -> Vec<AbortHandle> {
|
2024-01-22 01:54:39 -08:00
|
|
|
let commitment_config = CommitmentConfig::processed();
|
2024-01-21 04:56:40 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
let mut channels = vec![];
|
2024-01-21 04:56:40 -08:00
|
|
|
for grpc_source in grpc_sources {
|
2024-02-07 06:51:22 -08:00
|
|
|
// tasks will be shutdown automatically if the channel gets closed
|
|
|
|
let (_jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
|
|
|
|
grpc_source.clone(),
|
|
|
|
GeyserFilter(commitment_config).blocks_and_txs(),
|
|
|
|
);
|
|
|
|
channels.push(message_channel)
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
2024-02-07 06:51:22 -08:00
|
|
|
|
|
|
|
let source_channels = channels.into_iter().map(ReceiverStream::new).collect_vec();
|
|
|
|
let mut fused_streams = source_channels.merge();
|
|
|
|
|
|
|
|
let jh_merging_streams = tokio::task::spawn(async move {
|
2024-01-21 04:56:40 -08:00
|
|
|
let mut slots_processed = BTreeSet::<u64>::new();
|
|
|
|
loop {
|
|
|
|
const MAX_SIZE: usize = 1024;
|
2024-02-07 06:51:22 -08:00
|
|
|
match fused_streams.next().await {
|
|
|
|
Some(Message::GeyserSubscribeUpdate(subscribe_update)) => {
|
|
|
|
let mapfilter =
|
|
|
|
map_block_from_yellowstone_update(*subscribe_update, commitment_config);
|
|
|
|
if let Some((slot, produced_block)) = mapfilter {
|
|
|
|
let commitment_level_block = produced_block.commitment_config.commitment;
|
|
|
|
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
|
|
|
|
// it means that the slot is too old to process
|
|
|
|
if !slots_processed.contains(&slot)
|
|
|
|
&& (slots_processed.len() < MAX_SIZE / 2
|
|
|
|
|| slot > slots_processed.first().cloned().unwrap_or_default())
|
|
|
|
{
|
|
|
|
let send_result = block_sender
|
|
|
|
.send(produced_block)
|
|
|
|
.context("Send block to channel");
|
|
|
|
if send_result.is_err() {
|
|
|
|
warn!("Block channel receiver is closed - aborting");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
trace!(
|
|
|
|
"emitted block #{}@{} from multiplexer",
|
|
|
|
slot,
|
|
|
|
commitment_level_block
|
|
|
|
);
|
|
|
|
|
|
|
|
slots_processed.insert(slot);
|
|
|
|
if slots_processed.len() > MAX_SIZE {
|
|
|
|
slots_processed.pop_first();
|
|
|
|
}
|
|
|
|
}
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
|
|
|
}
|
2024-02-07 06:51:22 -08:00
|
|
|
Some(Message::Connecting(attempt)) => {
|
|
|
|
if attempt > 1 {
|
|
|
|
warn!(
|
|
|
|
"Multiplexed geyser stream performs reconnect attempt {}",
|
|
|
|
attempt
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
warn!("Multiplexed geyser source stream terminated - aborting task");
|
|
|
|
return;
|
|
|
|
}
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
2024-02-07 06:51:22 -08:00
|
|
|
} // -- END receiver loop
|
2024-01-21 04:56:40 -08:00
|
|
|
});
|
2024-02-07 06:51:22 -08:00
|
|
|
vec![jh_merging_streams.abort_handle()]
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
fn create_grpc_multiplex_block_meta_stream(
|
|
|
|
grpc_sources: &Vec<GrpcSourceConfig>,
|
|
|
|
commitment_config: CommitmentConfig,
|
2024-02-07 06:51:22 -08:00
|
|
|
) -> impl Stream<Item = BlockMeta> {
|
|
|
|
let mut channels = vec![];
|
2024-01-21 04:56:40 -08:00
|
|
|
for grpc_source in grpc_sources {
|
2024-02-07 06:51:22 -08:00
|
|
|
let (_jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
|
2024-01-21 04:56:40 -08:00
|
|
|
grpc_source.clone(),
|
|
|
|
GeyserFilter(commitment_config).blocks_meta(),
|
|
|
|
);
|
2024-02-07 06:51:22 -08:00
|
|
|
channels.push(message_channel)
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
2024-02-07 06:51:22 -08:00
|
|
|
|
|
|
|
let source_channels = channels.into_iter().map(ReceiverStream::new).collect_vec();
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
commitment_config != CommitmentConfig::processed(),
|
|
|
|
"fastestwins strategy must not be used for processed level"
|
|
|
|
);
|
|
|
|
geyser_grpc_connector::grpcmultiplex_fastestwins::create_multiplexed_stream(
|
|
|
|
source_channels,
|
|
|
|
BlockMetaExtractor(commitment_config),
|
|
|
|
)
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
/// connect to multiple grpc sources to consume processed blocks and block status update
|
|
|
|
/// emits full blocks for commitment levels processed, confirmed, finalized in that order
|
|
|
|
/// the channel must never be closed
|
2023-12-22 05:42:20 -08:00
|
|
|
pub fn create_grpc_multiplex_blocks_subscription(
|
|
|
|
grpc_sources: Vec<GrpcSourceConfig>,
|
|
|
|
) -> (Receiver<ProducedBlock>, AnyhowJoinHandle) {
|
|
|
|
info!("Setup grpc multiplexed blocks connection...");
|
|
|
|
if grpc_sources.is_empty() {
|
|
|
|
info!("- no grpc connection configured");
|
|
|
|
}
|
|
|
|
for grpc_source in &grpc_sources {
|
|
|
|
info!("- connection to {}", grpc_source);
|
|
|
|
}
|
|
|
|
|
2024-01-08 05:14:59 -08:00
|
|
|
// return value is the broadcast receiver
|
2024-02-07 06:51:22 -08:00
|
|
|
// must NEVER be closed form inside this method
|
2024-01-08 05:14:59 -08:00
|
|
|
let (producedblock_sender, blocks_output_stream) =
|
2024-02-06 01:08:18 -08:00
|
|
|
tokio::sync::broadcast::channel::<ProducedBlock>(32);
|
2024-01-08 05:14:59 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
let mut reconnect_attempts = 0;
|
2024-01-21 04:56:40 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
// task MUST not terminate but might be aborted from outside
|
|
|
|
let jh_block_emitter_task = tokio::task::spawn(async move {
|
|
|
|
// channel must NEVER GET CLOSED
|
|
|
|
let (processed_block_sender, mut processed_block_reciever) =
|
|
|
|
tokio::sync::mpsc::unbounded_channel::<ProducedBlock>();
|
2024-01-21 04:56:40 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
loop {
|
|
|
|
let processed_block_sender = processed_block_sender.clone();
|
|
|
|
reconnect_attempts += 1;
|
|
|
|
if reconnect_attempts > 1 {
|
|
|
|
warn!(
|
|
|
|
"Multiplexed geyser stream performs reconnect attempt {}",
|
|
|
|
reconnect_attempts
|
2024-01-21 04:56:40 -08:00
|
|
|
);
|
2024-02-07 06:51:22 -08:00
|
|
|
}
|
2024-01-12 05:21:54 -08:00
|
|
|
|
2024-02-12 02:53:52 -08:00
|
|
|
// tasks which should be cleaned up uppon reconnect
|
|
|
|
let mut task_list: Vec<AbortHandle> = vec![];
|
|
|
|
|
|
|
|
let processed_blocks_tasks =
|
2024-02-07 06:51:22 -08:00
|
|
|
create_grpc_multiplex_processed_block_stream(&grpc_sources, processed_block_sender);
|
2024-02-12 02:53:52 -08:00
|
|
|
task_list.extend(processed_blocks_tasks);
|
2024-01-12 05:21:54 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
let confirmed_blockmeta_stream = create_grpc_multiplex_block_meta_stream(
|
|
|
|
&grpc_sources,
|
|
|
|
CommitmentConfig::confirmed(),
|
|
|
|
);
|
|
|
|
let finalized_blockmeta_stream = create_grpc_multiplex_block_meta_stream(
|
|
|
|
&grpc_sources,
|
|
|
|
CommitmentConfig::finalized(),
|
|
|
|
);
|
|
|
|
|
|
|
|
// by blockhash
|
2024-02-12 02:53:52 -08:00
|
|
|
// this map consumes sigificant amount of memory constrainted by CLEANUP_SLOTS_BEHIND_FINALIZED
|
2024-02-07 06:51:22 -08:00
|
|
|
let mut recent_processed_blocks = HashMap::<String, ProducedBlock>::new();
|
|
|
|
// both streams support backpressure, see log:
|
|
|
|
// grpc_subscription_autoreconnect_tasks: downstream receiver did not pick put message for 500ms - keep waiting
|
|
|
|
let mut confirmed_blockmeta_stream = std::pin::pin!(confirmed_blockmeta_stream);
|
|
|
|
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);
|
|
|
|
|
|
|
|
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
|
|
|
|
let mut last_finalized_slot: Slot = 0;
|
2024-02-12 02:53:52 -08:00
|
|
|
const CLEANUP_SLOTS_BEHIND_FINALIZED: u64 = 100;
|
2024-02-07 06:51:22 -08:00
|
|
|
let mut cleanup_without_recv_full_blocks: u8 = 0;
|
|
|
|
let mut cleanup_without_confirmed_recv_blocks_meta: u8 = 0;
|
|
|
|
let mut cleanup_without_finalized_recv_blocks_meta: u8 = 0;
|
|
|
|
let mut confirmed_block_not_yet_processed = HashSet::<String>::new();
|
|
|
|
|
|
|
|
// start logging errors when we recieve first finalized block
|
|
|
|
let mut startup_completed = false;
|
|
|
|
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data
|
|
|
|
'recv_loop: loop {
|
|
|
|
tokio::select! {
|
|
|
|
processed_block = processed_block_reciever.recv() => {
|
2024-02-12 02:53:52 -08:00
|
|
|
cleanup_without_recv_full_blocks = 0;
|
|
|
|
|
2024-01-21 04:56:40 -08:00
|
|
|
let processed_block = processed_block.expect("processed block from stream");
|
|
|
|
trace!("got processed block {} with blockhash {}",
|
|
|
|
processed_block.slot, processed_block.blockhash.clone());
|
|
|
|
if let Err(e) = producedblock_sender.send(processed_block.clone()) {
|
|
|
|
warn!("produced block channel has no receivers {e:?}");
|
2024-01-12 05:21:54 -08:00
|
|
|
}
|
2024-01-21 04:56:40 -08:00
|
|
|
if confirmed_block_not_yet_processed.remove(&processed_block.blockhash) {
|
|
|
|
if let Err(e) = producedblock_sender.send(processed_block.to_confirmed_block()) {
|
2024-02-07 06:51:22 -08:00
|
|
|
warn!("produced block channel has no receivers while trying to send confirmed block {e:?}");
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
recent_processed_blocks.insert(processed_block.blockhash.clone(), processed_block);
|
|
|
|
},
|
|
|
|
meta_confirmed = confirmed_blockmeta_stream.next() => {
|
|
|
|
cleanup_without_confirmed_recv_blocks_meta = 0;
|
2024-02-07 06:51:22 -08:00
|
|
|
let meta_confirmed = meta_confirmed.expect("confirmed block meta from stream");
|
|
|
|
let blockhash = meta_confirmed.blockhash;
|
2024-01-21 04:56:40 -08:00
|
|
|
if let Some(cached_processed_block) = recent_processed_blocks.get(&blockhash) {
|
|
|
|
let confirmed_block = cached_processed_block.to_confirmed_block();
|
|
|
|
debug!("got confirmed blockmeta {} with blockhash {}",
|
|
|
|
confirmed_block.slot, confirmed_block.blockhash.clone());
|
|
|
|
if let Err(e) = producedblock_sender.send(confirmed_block) {
|
2024-02-07 06:51:22 -08:00
|
|
|
warn!("confirmed block channel has no receivers {e:?}");
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
confirmed_block_not_yet_processed.insert(blockhash.clone());
|
2024-02-07 06:51:22 -08:00
|
|
|
log::debug!("backlog of not yset confirmed blocks: {}", confirmed_block_not_yet_processed.len());
|
2024-01-21 04:56:40 -08:00
|
|
|
}
|
2024-01-12 05:21:54 -08:00
|
|
|
},
|
|
|
|
meta_finalized = finalized_blockmeta_stream.next() => {
|
2024-01-21 04:56:40 -08:00
|
|
|
cleanup_without_finalized_recv_blocks_meta = 0;
|
2024-02-07 06:51:22 -08:00
|
|
|
let meta_finalized = meta_finalized.expect("finalized block meta from stream");
|
|
|
|
let blockhash = meta_finalized.blockhash;
|
2024-01-21 04:56:40 -08:00
|
|
|
if let Some(cached_processed_block) = recent_processed_blocks.remove(&blockhash) {
|
|
|
|
let finalized_block = cached_processed_block.to_finalized_block();
|
2024-01-12 05:21:54 -08:00
|
|
|
last_finalized_slot = finalized_block.slot;
|
2024-01-22 23:17:48 -08:00
|
|
|
startup_completed = true;
|
2024-01-12 05:21:54 -08:00
|
|
|
debug!("got finalized blockmeta {} with blockhash {}",
|
|
|
|
finalized_block.slot, finalized_block.blockhash.clone());
|
|
|
|
if let Err(e) = producedblock_sender.send(finalized_block) {
|
|
|
|
warn!("Finalized block channel has no receivers {e:?}");
|
|
|
|
}
|
2024-01-22 23:17:48 -08:00
|
|
|
} else if startup_completed {
|
2024-01-15 11:22:59 -08:00
|
|
|
// this warning is ok for first few blocks when we start lrpc
|
Production (#313)
* fix: panic on geyser close, multiplex bug
https://github.com/blockworks-foundation/geyser-grpc-connector/issues/3
* update Cargo.lock
* reverting cargo.lock
* Fix issues with grpc and postgres
* Solving merge issues
* Fixing cargo fmt
* Increase finish quic timeout (#280) (#281)
* integrate geyser slot subscription (#283)
* Increase finish quic timeout (#280)
* Moving geyser slot subscription from stream to channels (#282)
* Moving geyser slot subscription from stream to channels
* Closing all the slot subscription tasks incase of restart
* Making slot channel unbounded (bug)
* remove block_debug_listen
caused a panic - need more time to investigate
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] thread 'tokio-runtime-worker' panicked at cluster-endpoints/src/grpc_inspect.rs:59:21:
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] Error receiving block: Closed
2024-01-17T20:31:42.922 app[683d392fd45368] ams [info] 2024-01-17T20:31:42.912597Z ERROR lite_rpc: Services quit unexpectedly Err(cluster endpoint failure (Err(JoinError::Panic(Id(20), ...)), 1, [JoinHandle { id: Id(19) }, JoinHandle { id: Id(23) }])
* Update cargolock file
* Fixing clippy removing grpc_inspect
* merging main with production (#290)
* remove block_debug_listen (#286)
* remove block_debug_listen
caused a panic - need more time to investigate
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] thread 'tokio-runtime-worker' panicked at cluster-endpoints/src/grpc_inspect.rs:59:21:
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] Error receiving block: Closed
2024-01-17T20:31:42.922 app[683d392fd45368] ams [info] 2024-01-17T20:31:42.912597Z ERROR lite_rpc: Services quit unexpectedly Err(cluster endpoint failure (Err(JoinError::Panic(Id(20), ...)), 1, [JoinHandle { id: Id(19) }, JoinHandle { id: Id(23) }])
* clippy
* Fixing message too long and overflow panics (#288)
* Update geyser grpc connector commit (#289)
---------
Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com>
* Merging MTU changes and setting up transportation config (#293)
* remove block_debug_listen (#286)
* remove block_debug_listen
caused a panic - need more time to investigate
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] thread 'tokio-runtime-worker' panicked at cluster-endpoints/src/grpc_inspect.rs:59:21:
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] Error receiving block: Closed
2024-01-17T20:31:42.922 app[683d392fd45368] ams [info] 2024-01-17T20:31:42.912597Z ERROR lite_rpc: Services quit unexpectedly Err(cluster endpoint failure (Err(JoinError::Panic(Id(20), ...)), 1, [JoinHandle { id: Id(19) }, JoinHandle { id: Id(23) }])
* clippy
* Fixing message too long and overflow panics (#288)
* Update geyser grpc connector commit (#289)
* Updating the transport config to match with solana endpoint (#292)
* Updating the transport config to match with solana endpoint
* Setting max MTU after groovies comments
---------
Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com>
* Making block subscription processed and moving confirmed block subscr… (#291)
* Making block subscription processed and moving confirmed block subscription to meta
* Sending both processed and confirmed blocks, if block has already been confirmed
* Minor bug, subscribing to processed blocks instead of confirmed (#295)
* reduce log
* Restart connection if any stream fails (#306)
* Main into production 01/02/2024 (#312)
* Production into main (#303)
* fix: panic on geyser close, multiplex bug
https://github.com/blockworks-foundation/geyser-grpc-connector/issues/3
* update Cargo.lock
* reverting cargo.lock
* Fix issues with grpc and postgres
* Solving merge issues
* Fixing cargo fmt
* Increase finish quic timeout (#280) (#281)
* integrate geyser slot subscription (#283)
* Increase finish quic timeout (#280)
* Moving geyser slot subscription from stream to channels (#282)
* Moving geyser slot subscription from stream to channels
* Closing all the slot subscription tasks incase of restart
* Making slot channel unbounded (bug)
* remove block_debug_listen
caused a panic - need more time to investigate
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] thread 'tokio-runtime-worker' panicked at cluster-endpoints/src/grpc_inspect.rs:59:21:
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] Error receiving block: Closed
2024-01-17T20:31:42.922 app[683d392fd45368] ams [info] 2024-01-17T20:31:42.912597Z ERROR lite_rpc: Services quit unexpectedly Err(cluster endpoint failure (Err(JoinError::Panic(Id(20), ...)), 1, [JoinHandle { id: Id(19) }, JoinHandle { id: Id(23) }])
* Update cargolock file
* Fixing clippy removing grpc_inspect
* merging main with production (#290)
* remove block_debug_listen (#286)
* remove block_debug_listen
caused a panic - need more time to investigate
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] thread 'tokio-runtime-worker' panicked at cluster-endpoints/src/grpc_inspect.rs:59:21:
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] Error receiving block: Closed
2024-01-17T20:31:42.922 app[683d392fd45368] ams [info] 2024-01-17T20:31:42.912597Z ERROR lite_rpc: Services quit unexpectedly Err(cluster endpoint failure (Err(JoinError::Panic(Id(20), ...)), 1, [JoinHandle { id: Id(19) }, JoinHandle { id: Id(23) }])
* clippy
* Fixing message too long and overflow panics (#288)
* Update geyser grpc connector commit (#289)
---------
Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com>
* Merging MTU changes and setting up transportation config (#293)
* remove block_debug_listen (#286)
* remove block_debug_listen
caused a panic - need more time to investigate
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] thread 'tokio-runtime-worker' panicked at cluster-endpoints/src/grpc_inspect.rs:59:21:
2024-01-17T20:31:42.913 app[683d392fd45368] ams [info] Error receiving block: Closed
2024-01-17T20:31:42.922 app[683d392fd45368] ams [info] 2024-01-17T20:31:42.912597Z ERROR lite_rpc: Services quit unexpectedly Err(cluster endpoint failure (Err(JoinError::Panic(Id(20), ...)), 1, [JoinHandle { id: Id(19) }, JoinHandle { id: Id(23) }])
* clippy
* Fixing message too long and overflow panics (#288)
* Update geyser grpc connector commit (#289)
* Updating the transport config to match with solana endpoint (#292)
* Updating the transport config to match with solana endpoint
* Setting max MTU after groovies comments
---------
Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com>
* Making block subscription processed and moving confirmed block subscr… (#291)
* Making block subscription processed and moving confirmed block subscription to meta
* Sending both processed and confirmed blocks, if block has already been confirmed
* Minor bug, subscribing to processed blocks instead of confirmed (#295)
---------
Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com>
Co-authored-by: GroovieGermanikus <groovie@mango.markets>
* Restart connection if any stream fails
* Updating to version 0.2.4
* Updating the change logs
* Fixing all the sub overflows by using saturating sub (#309)
* Fixing by cu computation adding more tests to check by CU (#311)
* Fixing by cu computation adding more tests to check by CU
* Adding more tests
---------
Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com>
Co-authored-by: GroovieGermanikus <groovie@mango.markets>
---------
Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com>
Co-authored-by: GroovieGermanikus <groovie@mango.markets>
2024-02-01 06:16:24 -08:00
|
|
|
log::warn!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
|
2024-01-12 05:21:54 -08:00
|
|
|
}
|
|
|
|
},
|
2024-02-07 06:51:22 -08:00
|
|
|
_ = cleanup_tick.tick() => {
|
2024-02-12 02:53:52 -08:00
|
|
|
// timebased restart
|
2024-02-07 06:51:22 -08:00
|
|
|
if cleanup_without_recv_full_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
|
|
|
|
cleanup_without_confirmed_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
|
|
|
|
cleanup_without_finalized_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
|
|
|
|
log::error!("block or block meta geyser stream stopped - restarting multiplexer ({}-{}-{})",
|
|
|
|
cleanup_without_recv_full_blocks, cleanup_without_confirmed_recv_blocks_meta, cleanup_without_finalized_recv_blocks_meta,);
|
|
|
|
// throttle a bit
|
|
|
|
sleep(Duration::from_millis(1500)).await;
|
|
|
|
break 'recv_loop;
|
|
|
|
}
|
|
|
|
cleanup_without_recv_full_blocks += 1;
|
|
|
|
cleanup_without_confirmed_recv_blocks_meta += 1;
|
|
|
|
cleanup_without_finalized_recv_blocks_meta += 1;
|
|
|
|
let size_before = recent_processed_blocks.len();
|
|
|
|
recent_processed_blocks.retain(|_blockhash, block| {
|
2024-02-12 02:53:52 -08:00
|
|
|
last_finalized_slot == 0 || block.slot > last_finalized_slot.saturating_sub(CLEANUP_SLOTS_BEHIND_FINALIZED)
|
2024-02-07 06:51:22 -08:00
|
|
|
});
|
2024-02-12 02:53:52 -08:00
|
|
|
let cnt_cleaned = size_before.saturating_sub(recent_processed_blocks.len());
|
2024-02-07 06:51:22 -08:00
|
|
|
if cnt_cleaned > 0 {
|
|
|
|
debug!("cleaned {} processed blocks from cache", cnt_cleaned);
|
2024-01-08 03:11:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2024-02-07 06:51:22 -08:00
|
|
|
} // -- END receiver loop
|
2024-02-12 02:53:52 -08:00
|
|
|
task_list.iter().for_each(|task| task.abort());
|
2024-02-07 06:51:22 -08:00
|
|
|
} // -- END reconnect loop
|
|
|
|
});
|
2023-12-22 05:42:20 -08:00
|
|
|
|
2024-01-08 05:14:59 -08:00
|
|
|
(blocks_output_stream, jh_block_emitter_task)
|
2023-12-22 05:42:20 -08:00
|
|
|
}
|
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
pub fn create_grpc_multiplex_processed_slots_subscription(
|
2023-12-22 05:42:20 -08:00
|
|
|
grpc_sources: Vec<GrpcSourceConfig>,
|
|
|
|
) -> (Receiver<SlotNotification>, AnyhowJoinHandle) {
|
2024-02-07 06:51:22 -08:00
|
|
|
const COMMITMENT_CONFIG: CommitmentConfig = CommitmentConfig::processed();
|
2023-12-22 05:42:20 -08:00
|
|
|
info!("Setup grpc multiplexed slots connection...");
|
|
|
|
if grpc_sources.is_empty() {
|
|
|
|
info!("- no grpc connection configured");
|
|
|
|
}
|
|
|
|
for grpc_source in &grpc_sources {
|
|
|
|
info!("- connection to {}", grpc_source);
|
|
|
|
}
|
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
// multiplexed_messages_sender must not be closed from inside this method
|
2024-01-17 06:17:49 -08:00
|
|
|
let (multiplexed_messages_sender, multiplexed_messages_rx) =
|
2024-02-06 01:08:18 -08:00
|
|
|
tokio::sync::broadcast::channel(32);
|
2024-01-12 05:21:54 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
// task MUST not terminate but might be aborted from outside
|
|
|
|
let jh_multiplex_task = tokio::spawn(async move {
|
2024-01-12 05:21:54 -08:00
|
|
|
loop {
|
2024-02-07 06:51:22 -08:00
|
|
|
let mut channels = vec![];
|
2024-01-17 06:17:49 -08:00
|
|
|
for grpc_source in &grpc_sources {
|
2024-02-07 06:51:22 -08:00
|
|
|
// tasks will be shutdown automatically if the channel gets closed
|
|
|
|
let (_jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
|
|
|
|
grpc_source.clone(),
|
|
|
|
GeyserFilter(COMMITMENT_CONFIG).slots(),
|
2024-01-17 06:17:49 -08:00
|
|
|
);
|
2024-02-07 06:51:22 -08:00
|
|
|
channels.push(message_channel)
|
2024-01-17 06:17:49 -08:00
|
|
|
}
|
2023-12-22 05:42:20 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
let source_channels = channels.into_iter().map(ReceiverStream::new).collect_vec();
|
|
|
|
let mut fused_streams = source_channels.merge();
|
|
|
|
|
|
|
|
'recv_loop: loop {
|
|
|
|
let next =
|
|
|
|
tokio::time::timeout(Duration::from_secs(30), fused_streams.next()).await;
|
|
|
|
match next {
|
|
|
|
Ok(Some(Message::GeyserSubscribeUpdate(slot_update))) => {
|
|
|
|
let mapfilter = map_slot_from_yellowstone_update(*slot_update);
|
|
|
|
if let Some(slot) = mapfilter {
|
|
|
|
let send_result = multiplexed_messages_sender
|
|
|
|
.send(SlotNotification {
|
|
|
|
processed_slot: slot,
|
|
|
|
estimated_processed_slot: slot,
|
|
|
|
})
|
|
|
|
.context("Send slot to channel");
|
|
|
|
if send_result.is_err() {
|
|
|
|
warn!("Slot channel receiver is closed - aborting");
|
|
|
|
bail!("Slot channel receiver is closed - aborting");
|
|
|
|
}
|
|
|
|
|
|
|
|
trace!(
|
|
|
|
"emitted slot #{}@{} from multiplexer",
|
|
|
|
slot,
|
|
|
|
COMMITMENT_CONFIG.commitment
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(Some(Message::Connecting(attempt))) => {
|
|
|
|
if attempt > 1 {
|
|
|
|
warn!(
|
|
|
|
"Multiplexed geyser slot stream performs reconnect attempt {}",
|
|
|
|
attempt
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(None) => {}
|
|
|
|
Err(_elapsed) => {
|
|
|
|
warn!("Multiplexed geyser slot stream timeout - reconnect");
|
|
|
|
// throttle
|
|
|
|
sleep(Duration::from_millis(1500)).await;
|
|
|
|
break 'recv_loop;
|
|
|
|
}
|
2024-01-12 05:21:54 -08:00
|
|
|
}
|
2024-02-07 06:51:22 -08:00
|
|
|
} // -- END receiver loop
|
|
|
|
} // -- END reconnect loop
|
|
|
|
});
|
|
|
|
|
|
|
|
(multiplexed_messages_rx, jh_multiplex_task)
|
|
|
|
}
|
2024-01-17 06:17:49 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
struct BlockMeta {
|
|
|
|
pub blockhash: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
struct BlockMetaExtractor(CommitmentConfig);
|
|
|
|
|
|
|
|
impl FromYellowstoneExtractor for BlockMetaExtractor {
|
|
|
|
type Target = BlockMeta;
|
|
|
|
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(u64, BlockMeta)> {
|
|
|
|
match update.update_oneof {
|
|
|
|
Some(UpdateOneof::BlockMeta(block_meta)) => Some((
|
|
|
|
block_meta.slot,
|
|
|
|
BlockMeta {
|
|
|
|
blockhash: block_meta.blockhash,
|
|
|
|
},
|
|
|
|
)),
|
|
|
|
_ => None,
|
2023-12-22 05:42:20 -08:00
|
|
|
}
|
2024-02-07 06:51:22 -08:00
|
|
|
}
|
|
|
|
}
|
2023-12-22 05:42:20 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option<Slot> {
|
|
|
|
match update.update_oneof {
|
|
|
|
Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot),
|
|
|
|
_ => None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn map_block_from_yellowstone_update(
|
|
|
|
update: SubscribeUpdate,
|
|
|
|
commitment_config: CommitmentConfig,
|
|
|
|
) -> Option<(Slot, ProducedBlock)> {
|
|
|
|
match update.update_oneof {
|
|
|
|
Some(UpdateOneof::Block(update_block_message)) => {
|
|
|
|
let block = from_grpc_block_update(update_block_message, commitment_config);
|
|
|
|
Some((block.slot, block))
|
|
|
|
}
|
|
|
|
_ => None,
|
|
|
|
}
|
2023-12-22 05:42:20 -08:00
|
|
|
}
|