diff --git a/stake_aggregate/src/epoch.rs b/stake_aggregate/src/epoch.rs index bd03108..2257490 100644 --- a/stake_aggregate/src/epoch.rs +++ b/stake_aggregate/src/epoch.rs @@ -7,10 +7,92 @@ use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_sdk::epoch_info::EpochInfo; use solana_sdk::sysvar::epoch_schedule::EpochSchedule; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::ops::Bound; use std::sync::Arc; use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel; +use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock; use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot; +#[derive(Debug)] +pub struct BlockSlotVerifier { + block_cache: BTreeMap, + slot_cache: BTreeSet, +} + +impl BlockSlotVerifier { + pub fn new() -> Self { + BlockSlotVerifier { + block_cache: BTreeMap::new(), + slot_cache: BTreeSet::new(), + } + } + pub fn process_slot(&mut self, slot: u64) -> Option<(u64, SubscribeUpdateBlock)> { + match self.block_cache.remove(&slot) { + //the block is already seen. Return slot/block. + Some(block) => Some((slot, block)), + None => { + self.slot_cache.insert(slot); + self.verify(slot); + None + } + } + } + pub fn process_block( + &mut self, + block: SubscribeUpdateBlock, + ) -> Option<(u64, SubscribeUpdateBlock)> { + let slot = block.slot; + if self.slot_cache.remove(&slot) { + //the slot is already seen. Return slot/block. + Some((slot, block)) + } else { + //Cache block and wait for the slot + let old = self.block_cache.insert(slot, block); + if old.is_some() { + log::warn!("Receive 2 blocks for the same slot:{slot}"); + } + None + } + } + + fn verify(&mut self, current_slot: u64) { + //do some verification on cached block and slot + let old_slot: Vec<_> = self + .slot_cache + .range(( + Bound::Unbounded, + Bound::Included(current_slot.saturating_sub(2)), + )) + .copied() + .collect(); + if old_slot.len() > 0 { + log::error!("Missing block for slots:{:?}", old_slot); + for slot in &old_slot { + self.slot_cache.remove(&slot); + } + } + + //verify that there's no too old block. + let old_block_slots: Vec<_> = self + .block_cache + .range(( + Bound::Unbounded, + Bound::Included(current_slot.saturating_sub(2)), + )) + .map(|(slot, _)| slot) + .copied() + .collect(); + if old_block_slots.len() > 0 { + log::error!("Missing slot for block slot:{:?}", old_slot); + for slot in old_block_slots { + self.block_cache.remove(&slot); + } + } + } +} + #[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] pub struct Epoch { pub epoch: u64, diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index 6526b75..cd5479b 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -36,6 +36,7 @@ curl http://localhost:3001 -X POST -H "Content-Type: application/json" -d ' use crate::bootstrap::BootstrapData; use crate::bootstrap::BootstrapEvent; +use crate::epoch::BlockSlotVerifier; use crate::leader_schedule::LeaderScheduleData; use crate::stakestore::StakeStore; use crate::votestore::VoteStore; @@ -261,7 +262,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re //use to process block at confirm slot. //at confirm slot are send before the block. //Verify that the last confirmed slot receive is the block slot. - let mut last_confirmed_slot = 0; + let mut block_slot_verifier = BlockSlotVerifier::new(); loop { tokio::select! { Some(req) = request_rx.recv() => { @@ -414,9 +415,10 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re if let CommitmentLevel::Confirmed = slot.status() { log::trace!("Receive confirmed slot:{}", slot.slot); - last_confirmed_slot = slot.slot; + if let Some((slot, block)) = block_slot_verifier.process_slot(slot.slot) { + process_block(block, slot); + } } - } Some(UpdateOneof::BlockMeta(block_meta)) => { log::info!("Receive Block Meta at slot: {}", block_meta.slot); @@ -428,11 +430,9 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re block.parent_slot, ); - let slot = block.slot; - if last_confirmed_slot != 0 && last_confirmed_slot != slot { - log::error!("No block found for slot:{last_confirmed_slot}. Get bloc for slot:{slot}"); + if let Some((slot, block)) = block_slot_verifier.process_block(block) { + process_block(block, slot); } - process_block(block, slot); } Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"),