diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index 059491a..ff101a6 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -78,6 +78,10 @@ enum Action { GetLatestBlockhash, GetBlockHeight, GetSlot, + IsBlockhashValid { + #[clap(long, short)] + blockhash: String, + }, GetVersion, } @@ -316,27 +320,32 @@ async fn main() -> anyhow::Result<()> { .ping(*count) .await .map_err(anyhow::Error::new) - .map(|response| info!("response: {:?}", response)), + .map(|response| info!("response: {response:?}")), Action::GetLatestBlockhash => client .get_latest_blockhash(commitment) .await .map_err(anyhow::Error::new) - .map(|response| info!("response: {:?}", response)), + .map(|response| info!("response: {response:?}")), Action::GetBlockHeight => client .get_block_height(commitment) .await .map_err(anyhow::Error::new) - .map(|response| info!("response: {:?}", response)), + .map(|response| info!("response: {response:?}")), Action::GetSlot => client .get_slot(commitment) .await .map_err(anyhow::Error::new) - .map(|response| info!("response: {:?}", response)), + .map(|response| info!("response: {response:?}")), + Action::IsBlockhashValid { blockhash } => client + .is_blockhash_valid(blockhash.clone(), commitment) + .await + .map_err(anyhow::Error::new) + .map(|response| info!("response: {response:?}")), Action::GetVersion => client .get_version() .await .map_err(anyhow::Error::new) - .map(|response| info!("response: {:?}", response)), + .map(|response| info!("response: {response:?}")), } .map_err(backoff::Error::transient)?; diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index ba95a23..c9e5fea 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -17,10 +17,11 @@ use { yellowstone_grpc_proto::prelude::{ geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, - GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse, PingRequest, - PongResponse, SubscribeRequest, SubscribeRequestFilterAccounts, - SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, - SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdate, + GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse, + IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse, + SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, SubscribeUpdate, }, }; @@ -163,6 +164,19 @@ impl GeyserGrpcClient { Ok(response.into_inner()) } + pub async fn is_blockhash_valid( + &mut self, + blockhash: String, + commitment: Option, + ) -> GeyserGrpcClientResult { + let request = tonic::Request::new(IsBlockhashValidRequest { + blockhash, + commitment: commitment.map(|value| value as i32), + }); + let response = self.client.is_blockhash_valid(request).await?; + Ok(response.into_inner()) + } + pub async fn get_version(&mut self) -> GeyserGrpcClientResult { let request = tonic::Request::new(GetVersionRequest {}); let response = self.client.get_version(request).await?; diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 481a0ce..ddacbd5 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -9,10 +9,11 @@ use { subscribe_update::UpdateOneof, CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, - GetVersionRequest, GetVersionResponse, PingRequest, PongResponse, SubscribeRequest, - SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, - SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdatePing, - SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, + GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest, + IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, + SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeUpdateBlock, + SubscribeUpdateBlockMeta, SubscribeUpdatePing, SubscribeUpdateSlot, + SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, }, version::VERSION, }, @@ -21,7 +22,9 @@ use { ReplicaAccountInfoV2, ReplicaBlockInfoV2, ReplicaTransactionInfoV2, SlotStatus, }, solana_sdk::{ - clock::UnixTimestamp, pubkey::Pubkey, signature::Signature, + clock::{UnixTimestamp, MAX_RECENT_BLOCKHASHES}, + pubkey::Pubkey, + signature::Signature, transaction::SanitizedTransaction, }, solana_transaction_status::{Reward, TransactionStatusMeta}, @@ -297,9 +300,29 @@ struct ClientConnection { stream_tx: mpsc::Sender>, } +#[derive(Debug)] +struct BlockhashStatus { + slot: u64, + processed: bool, + confirmed: bool, + finalized: bool, +} + +impl BlockhashStatus { + fn new(slot: u64) -> Self { + Self { + slot, + processed: false, + confirmed: false, + finalized: false, + } + } +} + #[derive(Debug, Default)] struct BlockMetaStorageInner { blocks: BTreeMap, + blockhashes: HashMap, processed: Option, confirmed: Option, finalized: Option, @@ -330,8 +353,37 @@ impl BlockMetaStorage { } .replace(msg.slot); + if let Some(blockhash) = storage + .blocks + .get(&msg.slot) + .map(|block| block.blockhash.clone()) + { + let entry = storage + .blockhashes + .entry(blockhash) + .or_insert_with(|| BlockhashStatus::new(msg.slot)); + + let status = match msg.status { + CommitmentLevel::Processed => &mut entry.processed, + CommitmentLevel::Confirmed => &mut entry.confirmed, + CommitmentLevel::Finalized => &mut entry.finalized, + }; + *status = true; + } + if msg.status == CommitmentLevel::Finalized { - clean_btree_slots(&mut storage.blocks, msg.slot - KEEP_SLOTS); + let keep_slot = msg.slot - KEEP_SLOTS; + loop { + match storage.blocks.keys().next().cloned() { + Some(slot) if slot < keep_slot => storage.blocks.remove(&slot), + _ => break, + }; + } + + let keep_slot = msg.slot - MAX_RECENT_BLOCKHASHES as u64 - 32; + storage + .blockhashes + .retain(|_blockhash, status| status.slot >= keep_slot); } } Message::BlockMeta(msg) => { @@ -347,6 +399,14 @@ impl BlockMetaStorage { (Self { inner }, tx) } + fn parse_commitment(commitment: Option) -> Result { + let commitment = commitment.unwrap_or(CommitmentLevel::Finalized as i32); + CommitmentLevel::from_i32(commitment).ok_or_else(|| { + let msg = format!("failed to create CommitmentLevel from {commitment:?}"); + Status::unknown(msg) + }) + } + async fn get_block( &self, handler: F, @@ -355,12 +415,7 @@ impl BlockMetaStorage { where F: FnOnce(&MessageBlockMeta) -> Option, { - let commitment = commitment.unwrap_or(CommitmentLevel::Finalized as i32); - let commitment = CommitmentLevel::from_i32(commitment).ok_or_else(|| { - let msg = format!("failed to create CommitmentLevel from {commitment:?}"); - Status::unknown(msg) - })?; - + let commitment = Self::parse_commitment(commitment)?; let storage = self.inner.read().await; let slot = match commitment { @@ -368,6 +423,7 @@ impl BlockMetaStorage { CommitmentLevel::Confirmed => storage.confirmed, CommitmentLevel::Finalized => storage.finalized, }; + match slot.and_then(|slot| storage.blocks.get(&slot)) { Some(block) => match handler(block) { Some(resp) => Ok(Response::new(resp)), @@ -376,15 +432,37 @@ impl BlockMetaStorage { None => Err(Status::internal("block is not available yet")), } } -} -fn clean_btree_slots(storage: &mut BTreeMap, keep_slot: u64) { - while let Some(slot) = storage.keys().next().cloned() { - if slot < keep_slot { - storage.remove(&slot); - } else { - break; + async fn is_blockhash_valid( + &self, + blockhash: &str, + commitment: Option, + ) -> Result, Status> { + let commitment = Self::parse_commitment(commitment)?; + let storage = self.inner.read().await; + + if storage.blockhashes.len() < MAX_RECENT_BLOCKHASHES + 32 { + return Err(Status::internal("startup")); } + + let slot = match commitment { + CommitmentLevel::Processed => storage.processed, + CommitmentLevel::Confirmed => storage.confirmed, + CommitmentLevel::Finalized => storage.finalized, + } + .ok_or_else(|| Status::internal("startup"))?; + + let valid = storage + .blockhashes + .get(blockhash) + .map(|status| match commitment { + CommitmentLevel::Processed => status.processed, + CommitmentLevel::Confirmed => status.confirmed, + CommitmentLevel::Finalized => status.finalized, + }) + .unwrap_or(false); + + Ok(Response::new(IsBlockhashValidResponse { valid, slot })) } } @@ -507,7 +585,13 @@ impl GrpcService { } if slot.status == CommitmentLevel::Finalized { - clean_btree_slots(&mut messages, slot.slot - KEEP_SLOTS); + let keep_slot = slot.slot - KEEP_SLOTS; + loop { + match messages.keys().next().cloned() { + Some(slot) if slot < keep_slot => messages.remove(&slot), + _ => break, + }; + } } }, Some(msg) = new_clients_rx.recv() => { @@ -672,6 +756,16 @@ impl Geyser for GrpcService { .await } + async fn is_blockhash_valid( + &self, + request: Request, + ) -> Result, Status> { + let req = request.get_ref(); + self.blocks_meta + .is_blockhash_valid(&req.blockhash, req.commitment) + .await + } + async fn get_version( &self, _request: Request, diff --git a/yellowstone-grpc-proto/proto/geyser.proto b/yellowstone-grpc-proto/proto/geyser.proto index 0e9789e..fabcfd5 100644 --- a/yellowstone-grpc-proto/proto/geyser.proto +++ b/yellowstone-grpc-proto/proto/geyser.proto @@ -12,6 +12,7 @@ service Geyser { rpc GetLatestBlockhash(GetLatestBlockhashRequest) returns (GetLatestBlockhashResponse) {} rpc GetBlockHeight(GetBlockHeightRequest) returns (GetBlockHeightResponse) {} rpc GetSlot(GetSlotRequest) returns (GetSlotResponse) {} + rpc isBlockhashValid(IsBlockhashValidRequest) returns (IsBlockhashValidResponse) {} rpc GetVersion(GetVersionRequest) returns (GetVersionResponse) {} } @@ -156,7 +157,6 @@ message GetLatestBlockhashRequest { } message GetLatestBlockhashResponse { - // The latest blockhash uint64 slot = 1; string blockhash = 2; uint64 last_valid_block_height = 3; @@ -183,3 +183,13 @@ message GetVersionRequest {} message GetVersionResponse { string version = 1; } + +message IsBlockhashValidRequest { + string blockhash = 1; + optional CommitmentLevel commitment = 2; +} + +message IsBlockhashValidResponse { + uint64 slot = 1; + bool valid = 2; +}