Resarting geyser slot and block stream in case they disconnect
This commit is contained in:
parent
d91e2e9316
commit
3f69de5247
|
@ -1,4 +1,3 @@
|
||||||
use crate::grpc_stream_utils::channelize_stream;
|
|
||||||
use crate::grpc_subscription::map_block_update;
|
use crate::grpc_subscription::map_block_update;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
|
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
|
||||||
|
@ -62,84 +61,98 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
||||||
info!("- connection to {}", grpc_source);
|
info!("- connection to {}", grpc_source);
|
||||||
}
|
}
|
||||||
|
|
||||||
let confirmed_blocks_stream = {
|
|
||||||
let commitment_config = CommitmentConfig::confirmed();
|
|
||||||
|
|
||||||
let mut streams = Vec::new();
|
|
||||||
for grpc_source in &grpc_sources {
|
|
||||||
let stream = create_geyser_reconnecting_stream(
|
|
||||||
grpc_source.clone(),
|
|
||||||
GeyserFilter(commitment_config).blocks_and_txs(),
|
|
||||||
);
|
|
||||||
streams.push(stream);
|
|
||||||
}
|
|
||||||
|
|
||||||
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
|
|
||||||
};
|
|
||||||
|
|
||||||
let finalized_blockmeta_stream = {
|
|
||||||
let commitment_config = CommitmentConfig::finalized();
|
|
||||||
|
|
||||||
let mut streams = Vec::new();
|
|
||||||
for grpc_source in &grpc_sources {
|
|
||||||
let stream = create_geyser_reconnecting_stream(
|
|
||||||
grpc_source.clone(),
|
|
||||||
GeyserFilter(commitment_config).blocks_meta(),
|
|
||||||
);
|
|
||||||
streams.push(stream);
|
|
||||||
}
|
|
||||||
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
|
|
||||||
};
|
|
||||||
|
|
||||||
// return value is the broadcast receiver
|
// return value is the broadcast receiver
|
||||||
let (producedblock_sender, blocks_output_stream) =
|
let (producedblock_sender, blocks_output_stream) =
|
||||||
tokio::sync::broadcast::channel::<ProducedBlock>(1000);
|
tokio::sync::broadcast::channel::<ProducedBlock>(1000);
|
||||||
|
|
||||||
let jh_block_emitter_task = {
|
let jh_block_emitter_task = {
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
// by blockhash
|
|
||||||
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
|
|
||||||
let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream);
|
|
||||||
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);
|
|
||||||
|
|
||||||
let sender = producedblock_sender;
|
|
||||||
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
|
|
||||||
let mut last_finalized_slot: Slot = 0;
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
let confirmed_blocks_stream = {
|
||||||
confirmed_block = confirmed_blocks_stream.next() => {
|
let commitment_config = CommitmentConfig::confirmed();
|
||||||
let confirmed_block = confirmed_block.expect("confirmed block from stream");
|
|
||||||
trace!("got confirmed block {} with blockhash {}",
|
let mut streams = Vec::new();
|
||||||
confirmed_block.slot, confirmed_block.blockhash.clone());
|
for grpc_source in &grpc_sources {
|
||||||
if let Err(e) = sender.send(confirmed_block.clone()) {
|
let stream = create_geyser_reconnecting_stream(
|
||||||
warn!("Confirmed block channel has no receivers {e:?}");
|
grpc_source.clone(),
|
||||||
continue
|
GeyserFilter(commitment_config).blocks_and_txs(),
|
||||||
}
|
);
|
||||||
recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
|
streams.push(stream);
|
||||||
},
|
}
|
||||||
meta_finalized = finalized_blockmeta_stream.next() => {
|
|
||||||
let blockhash = meta_finalized.expect("finalized block meta from stream");
|
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
|
||||||
if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) {
|
};
|
||||||
let finalized_block = cached_confirmed_block.to_finalized_block();
|
|
||||||
last_finalized_slot = finalized_block.slot;
|
let finalized_blockmeta_stream = {
|
||||||
debug!("got finalized blockmeta {} with blockhash {}",
|
let commitment_config = CommitmentConfig::finalized();
|
||||||
finalized_block.slot, finalized_block.blockhash.clone());
|
|
||||||
if let Err(e) = sender.send(finalized_block) {
|
let mut streams = Vec::new();
|
||||||
warn!("Finalized block channel has no receivers {e:?}");
|
for grpc_source in &grpc_sources {
|
||||||
continue;
|
let stream = create_geyser_reconnecting_stream(
|
||||||
|
grpc_source.clone(),
|
||||||
|
GeyserFilter(commitment_config).blocks_meta(),
|
||||||
|
);
|
||||||
|
streams.push(stream);
|
||||||
|
}
|
||||||
|
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
|
||||||
|
};
|
||||||
|
|
||||||
|
// by blockhash
|
||||||
|
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
|
||||||
|
let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream);
|
||||||
|
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);
|
||||||
|
|
||||||
|
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
|
||||||
|
let mut last_finalized_slot: Slot = 0;
|
||||||
|
let mut cleanup_without_recv_blocks: u8 = 0;
|
||||||
|
let mut cleanup_without_recv_blocks_meta: u8 = 0;
|
||||||
|
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV : u8 = 12; // 12*5 = 60s without recving data
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
confirmed_block = confirmed_blocks_stream.next() => {
|
||||||
|
cleanup_without_recv_blocks = 0;
|
||||||
|
|
||||||
|
let confirmed_block = confirmed_block.expect("confirmed block from stream");
|
||||||
|
trace!("got confirmed block {} with blockhash {}",
|
||||||
|
confirmed_block.slot, confirmed_block.blockhash.clone());
|
||||||
|
if let Err(e) = producedblock_sender.send(confirmed_block.clone()) {
|
||||||
|
warn!("Confirmed block channel has no receivers {e:?}");
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
|
||||||
|
},
|
||||||
|
meta_finalized = finalized_blockmeta_stream.next() => {
|
||||||
|
cleanup_without_recv_blocks_meta = 0;
|
||||||
|
let blockhash = meta_finalized.expect("finalized block meta from stream");
|
||||||
|
if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) {
|
||||||
|
let finalized_block = cached_confirmed_block.to_finalized_block();
|
||||||
|
last_finalized_slot = finalized_block.slot;
|
||||||
|
debug!("got finalized blockmeta {} with blockhash {}",
|
||||||
|
finalized_block.slot, finalized_block.blockhash.clone());
|
||||||
|
if let Err(e) = producedblock_sender.send(finalized_block) {
|
||||||
|
warn!("Finalized block channel has no receivers {e:?}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ = cleanup_tick.tick() => {
|
||||||
|
if cleanup_without_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
|
||||||
|
cleanup_without_recv_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
|
||||||
|
log::error!("block or block meta stream stopped restaring blocks");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cleanup_without_recv_blocks += 1;
|
||||||
|
cleanup_without_recv_blocks_meta += 1;
|
||||||
|
let size_before = recent_confirmed_blocks.len();
|
||||||
|
recent_confirmed_blocks.retain(|_blockhash, block| {
|
||||||
|
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
|
||||||
|
});
|
||||||
|
let cnt_cleaned = size_before - recent_confirmed_blocks.len();
|
||||||
|
if cnt_cleaned > 0 {
|
||||||
|
debug!("cleaned {} confirmed blocks from cache", cnt_cleaned);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ = cleanup_tick.tick() => {
|
|
||||||
let size_before = recent_confirmed_blocks.len();
|
|
||||||
recent_confirmed_blocks.retain(|_blockhash, block| {
|
|
||||||
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
|
|
||||||
});
|
|
||||||
let cnt_cleaned = size_before - recent_confirmed_blocks.len();
|
|
||||||
if cnt_cleaned > 0 {
|
|
||||||
debug!("cleaned {} confirmed blocks from cache", cnt_cleaned);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -179,37 +192,63 @@ pub fn create_grpc_multiplex_slots_subscription(
|
||||||
info!("- connection to {}", grpc_source);
|
info!("- connection to {}", grpc_source);
|
||||||
}
|
}
|
||||||
|
|
||||||
let multiplex_stream = {
|
let (multiplexed_messages_sender, multiplexed_messages) = tokio::sync::broadcast::channel(1000);
|
||||||
let mut streams = Vec::new();
|
|
||||||
for grpc_source in &grpc_sources {
|
|
||||||
let mut slots = HashMap::new();
|
|
||||||
slots.insert(
|
|
||||||
"client".to_string(),
|
|
||||||
SubscribeRequestFilterSlots {
|
|
||||||
filter_by_commitment: Some(true),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let filter = SubscribeRequest {
|
let jh = tokio::spawn(async move {
|
||||||
slots,
|
loop {
|
||||||
accounts: Default::default(),
|
let multiplex_stream = {
|
||||||
transactions: HashMap::new(),
|
let mut streams = Vec::new();
|
||||||
entry: Default::default(),
|
for grpc_source in &grpc_sources {
|
||||||
blocks: HashMap::new(),
|
let mut slots = HashMap::new();
|
||||||
blocks_meta: HashMap::new(),
|
slots.insert(
|
||||||
commitment: Some(yellowstone_grpc_proto::geyser::CommitmentLevel::Processed as i32),
|
"client".to_string(),
|
||||||
accounts_data_slice: Default::default(),
|
SubscribeRequestFilterSlots {
|
||||||
ping: None,
|
filter_by_commitment: Some(true),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let filter = SubscribeRequest {
|
||||||
|
slots,
|
||||||
|
accounts: Default::default(),
|
||||||
|
transactions: HashMap::new(),
|
||||||
|
entry: Default::default(),
|
||||||
|
blocks: HashMap::new(),
|
||||||
|
blocks_meta: HashMap::new(),
|
||||||
|
commitment: Some(yellowstone_grpc_proto::geyser::CommitmentLevel::Processed as i32),
|
||||||
|
accounts_data_slice: Default::default(),
|
||||||
|
ping: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let stream = create_geyser_reconnecting_stream(grpc_source.clone(), filter);
|
||||||
|
streams.push(stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
create_multiplexed_stream(streams, SlotExtractor {})
|
||||||
};
|
};
|
||||||
|
|
||||||
let stream = create_geyser_reconnecting_stream(grpc_source.clone(), filter);
|
let mut multiplex_stream = std::pin::pin!(multiplex_stream);
|
||||||
streams.push(stream);
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
slot_data = multiplex_stream.next() => {
|
||||||
|
if let Some(slot_data) = slot_data {
|
||||||
|
match multiplexed_messages_sender.send(slot_data) {
|
||||||
|
Ok(receivers) => {
|
||||||
|
trace!("sent data to {} receivers", receivers);
|
||||||
|
}
|
||||||
|
Err(send_error) => log::error!("Get error while sending on slot channel {}", send_error),
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
debug!("Slot stream send None type");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(30)) => {
|
||||||
|
log::error!("Slots timedout restarting subscription");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
create_multiplexed_stream(streams, SlotExtractor {})
|
(multiplexed_messages, jh)
|
||||||
};
|
|
||||||
|
|
||||||
let (multiplexed_stream, jh_channelizer) = channelize_stream(multiplex_stream);
|
|
||||||
|
|
||||||
(multiplexed_stream, jh_channelizer)
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue