add slot or block missing detection

This commit is contained in:
musitdev 2023-11-21 18:54:30 +01:00
parent 2fff2b0824
commit c14776c035
2 changed files with 89 additions and 7 deletions

View File

@ -7,10 +7,92 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::ops::Bound;
use std::sync::Arc;
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
#[derive(Debug)]
pub struct BlockSlotVerifier {
block_cache: BTreeMap<u64, SubscribeUpdateBlock>,
slot_cache: BTreeSet<u64>,
}
impl BlockSlotVerifier {
pub fn new() -> Self {
BlockSlotVerifier {
block_cache: BTreeMap::new(),
slot_cache: BTreeSet::new(),
}
}
pub fn process_slot(&mut self, slot: u64) -> Option<(u64, SubscribeUpdateBlock)> {
match self.block_cache.remove(&slot) {
//the block is already seen. Return slot/block.
Some(block) => Some((slot, block)),
None => {
self.slot_cache.insert(slot);
self.verify(slot);
None
}
}
}
pub fn process_block(
&mut self,
block: SubscribeUpdateBlock,
) -> Option<(u64, SubscribeUpdateBlock)> {
let slot = block.slot;
if self.slot_cache.remove(&slot) {
//the slot is already seen. Return slot/block.
Some((slot, block))
} else {
//Cache block and wait for the slot
let old = self.block_cache.insert(slot, block);
if old.is_some() {
log::warn!("Receive 2 blocks for the same slot:{slot}");
}
None
}
}
fn verify(&mut self, current_slot: u64) {
//do some verification on cached block and slot
let old_slot: Vec<_> = self
.slot_cache
.range((
Bound::Unbounded,
Bound::Included(current_slot.saturating_sub(2)),
))
.copied()
.collect();
if old_slot.len() > 0 {
log::error!("Missing block for slots:{:?}", old_slot);
for slot in &old_slot {
self.slot_cache.remove(&slot);
}
}
//verify that there's no too old block.
let old_block_slots: Vec<_> = self
.block_cache
.range((
Bound::Unbounded,
Bound::Included(current_slot.saturating_sub(2)),
))
.map(|(slot, _)| slot)
.copied()
.collect();
if old_block_slots.len() > 0 {
log::error!("Missing slot for block slot:{:?}", old_slot);
for slot in old_block_slots {
self.block_cache.remove(&slot);
}
}
}
}
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
pub struct Epoch {
pub epoch: u64,

View File

@ -36,6 +36,7 @@ curl http://localhost:3001 -X POST -H "Content-Type: application/json" -d '
use crate::bootstrap::BootstrapData;
use crate::bootstrap::BootstrapEvent;
use crate::epoch::BlockSlotVerifier;
use crate::leader_schedule::LeaderScheduleData;
use crate::stakestore::StakeStore;
use crate::votestore::VoteStore;
@ -261,7 +262,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//use to process block at confirm slot.
//at confirm slot are send before the block.
//Verify that the last confirmed slot receive is the block slot.
let mut last_confirmed_slot = 0;
let mut block_slot_verifier = BlockSlotVerifier::new();
loop {
tokio::select! {
Some(req) = request_rx.recv() => {
@ -414,9 +415,10 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
if let CommitmentLevel::Confirmed = slot.status() {
log::trace!("Receive confirmed slot:{}", slot.slot);
last_confirmed_slot = slot.slot;
if let Some((slot, block)) = block_slot_verifier.process_slot(slot.slot) {
process_block(block, slot);
}
}
}
Some(UpdateOneof::BlockMeta(block_meta)) => {
log::info!("Receive Block Meta at slot: {}", block_meta.slot);
@ -428,11 +430,9 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
block.parent_slot,
);
let slot = block.slot;
if last_confirmed_slot != 0 && last_confirmed_slot != slot {
log::error!("No block found for slot:{last_confirmed_slot}. Get bloc for slot:{slot}");
if let Some((slot, block)) = block_slot_verifier.process_block(block) {
process_block(block, slot);
}
process_block(block, slot);
}
Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"),