grpc, proto: add unary is_blockhash_valid (#132)

This commit is contained in:
Kirill Fomichev 2023-05-26 14:11:58 -04:00 committed by GitHub
parent 6812298fbc
commit 604add3080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 157 additions and 30 deletions

View File

@ -78,6 +78,10 @@ enum Action {
GetLatestBlockhash, GetLatestBlockhash,
GetBlockHeight, GetBlockHeight,
GetSlot, GetSlot,
IsBlockhashValid {
#[clap(long, short)]
blockhash: String,
},
GetVersion, GetVersion,
} }
@ -316,27 +320,32 @@ async fn main() -> anyhow::Result<()> {
.ping(*count) .ping(*count)
.await .await
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {response:?}")),
Action::GetLatestBlockhash => client Action::GetLatestBlockhash => client
.get_latest_blockhash(commitment) .get_latest_blockhash(commitment)
.await .await
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {response:?}")),
Action::GetBlockHeight => client Action::GetBlockHeight => client
.get_block_height(commitment) .get_block_height(commitment)
.await .await
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {response:?}")),
Action::GetSlot => client Action::GetSlot => client
.get_slot(commitment) .get_slot(commitment)
.await .await
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {response:?}")),
Action::IsBlockhashValid { blockhash } => client
.is_blockhash_valid(blockhash.clone(), commitment)
.await
.map_err(anyhow::Error::new)
.map(|response| info!("response: {response:?}")),
Action::GetVersion => client Action::GetVersion => client
.get_version() .get_version()
.await .await
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {response:?}")),
} }
.map_err(backoff::Error::transient)?; .map_err(backoff::Error::transient)?;

View File

@ -17,10 +17,11 @@ use {
yellowstone_grpc_proto::prelude::{ yellowstone_grpc_proto::prelude::{
geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest, geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest,
GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse,
GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse, PingRequest, GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse,
PongResponse, SubscribeRequest, SubscribeRequestFilterAccounts, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdate, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions, SubscribeUpdate,
}, },
}; };
@ -163,6 +164,19 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
Ok(response.into_inner()) Ok(response.into_inner())
} }
pub async fn is_blockhash_valid(
&mut self,
blockhash: String,
commitment: Option<CommitmentLevel>,
) -> GeyserGrpcClientResult<IsBlockhashValidResponse> {
let request = tonic::Request::new(IsBlockhashValidRequest {
blockhash,
commitment: commitment.map(|value| value as i32),
});
let response = self.client.is_blockhash_valid(request).await?;
Ok(response.into_inner())
}
pub async fn get_version(&mut self) -> GeyserGrpcClientResult<GetVersionResponse> { pub async fn get_version(&mut self) -> GeyserGrpcClientResult<GetVersionResponse> {
let request = tonic::Request::new(GetVersionRequest {}); let request = tonic::Request::new(GetVersionRequest {});
let response = self.client.get_version(request).await?; let response = self.client.get_version(request).await?;

View File

@ -9,10 +9,11 @@ use {
subscribe_update::UpdateOneof, subscribe_update::UpdateOneof,
CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse,
GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse,
GetVersionRequest, GetVersionResponse, PingRequest, PongResponse, SubscribeRequest, GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest,
SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate,
SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdatePing, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeUpdateBlock,
SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, SubscribeUpdateBlockMeta, SubscribeUpdatePing, SubscribeUpdateSlot,
SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo,
}, },
version::VERSION, version::VERSION,
}, },
@ -21,7 +22,9 @@ use {
ReplicaAccountInfoV2, ReplicaBlockInfoV2, ReplicaTransactionInfoV2, SlotStatus, ReplicaAccountInfoV2, ReplicaBlockInfoV2, ReplicaTransactionInfoV2, SlotStatus,
}, },
solana_sdk::{ solana_sdk::{
clock::UnixTimestamp, pubkey::Pubkey, signature::Signature, clock::{UnixTimestamp, MAX_RECENT_BLOCKHASHES},
pubkey::Pubkey,
signature::Signature,
transaction::SanitizedTransaction, transaction::SanitizedTransaction,
}, },
solana_transaction_status::{Reward, TransactionStatusMeta}, solana_transaction_status::{Reward, TransactionStatusMeta},
@ -297,9 +300,29 @@ struct ClientConnection {
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>, stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
} }
#[derive(Debug)]
struct BlockhashStatus {
slot: u64,
processed: bool,
confirmed: bool,
finalized: bool,
}
impl BlockhashStatus {
fn new(slot: u64) -> Self {
Self {
slot,
processed: false,
confirmed: false,
finalized: false,
}
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct BlockMetaStorageInner { struct BlockMetaStorageInner {
blocks: BTreeMap<u64, MessageBlockMeta>, blocks: BTreeMap<u64, MessageBlockMeta>,
blockhashes: HashMap<String, BlockhashStatus>,
processed: Option<u64>, processed: Option<u64>,
confirmed: Option<u64>, confirmed: Option<u64>,
finalized: Option<u64>, finalized: Option<u64>,
@ -330,8 +353,37 @@ impl BlockMetaStorage {
} }
.replace(msg.slot); .replace(msg.slot);
if let Some(blockhash) = storage
.blocks
.get(&msg.slot)
.map(|block| block.blockhash.clone())
{
let entry = storage
.blockhashes
.entry(blockhash)
.or_insert_with(|| BlockhashStatus::new(msg.slot));
let status = match msg.status {
CommitmentLevel::Processed => &mut entry.processed,
CommitmentLevel::Confirmed => &mut entry.confirmed,
CommitmentLevel::Finalized => &mut entry.finalized,
};
*status = true;
}
if msg.status == CommitmentLevel::Finalized { if msg.status == CommitmentLevel::Finalized {
clean_btree_slots(&mut storage.blocks, msg.slot - KEEP_SLOTS); let keep_slot = msg.slot - KEEP_SLOTS;
loop {
match storage.blocks.keys().next().cloned() {
Some(slot) if slot < keep_slot => storage.blocks.remove(&slot),
_ => break,
};
}
let keep_slot = msg.slot - MAX_RECENT_BLOCKHASHES as u64 - 32;
storage
.blockhashes
.retain(|_blockhash, status| status.slot >= keep_slot);
} }
} }
Message::BlockMeta(msg) => { Message::BlockMeta(msg) => {
@ -347,6 +399,14 @@ impl BlockMetaStorage {
(Self { inner }, tx) (Self { inner }, tx)
} }
fn parse_commitment(commitment: Option<i32>) -> Result<CommitmentLevel, Status> {
let commitment = commitment.unwrap_or(CommitmentLevel::Finalized as i32);
CommitmentLevel::from_i32(commitment).ok_or_else(|| {
let msg = format!("failed to create CommitmentLevel from {commitment:?}");
Status::unknown(msg)
})
}
async fn get_block<F, T>( async fn get_block<F, T>(
&self, &self,
handler: F, handler: F,
@ -355,12 +415,7 @@ impl BlockMetaStorage {
where where
F: FnOnce(&MessageBlockMeta) -> Option<T>, F: FnOnce(&MessageBlockMeta) -> Option<T>,
{ {
let commitment = commitment.unwrap_or(CommitmentLevel::Finalized as i32); let commitment = Self::parse_commitment(commitment)?;
let commitment = CommitmentLevel::from_i32(commitment).ok_or_else(|| {
let msg = format!("failed to create CommitmentLevel from {commitment:?}");
Status::unknown(msg)
})?;
let storage = self.inner.read().await; let storage = self.inner.read().await;
let slot = match commitment { let slot = match commitment {
@ -368,6 +423,7 @@ impl BlockMetaStorage {
CommitmentLevel::Confirmed => storage.confirmed, CommitmentLevel::Confirmed => storage.confirmed,
CommitmentLevel::Finalized => storage.finalized, CommitmentLevel::Finalized => storage.finalized,
}; };
match slot.and_then(|slot| storage.blocks.get(&slot)) { match slot.and_then(|slot| storage.blocks.get(&slot)) {
Some(block) => match handler(block) { Some(block) => match handler(block) {
Some(resp) => Ok(Response::new(resp)), Some(resp) => Ok(Response::new(resp)),
@ -376,15 +432,37 @@ impl BlockMetaStorage {
None => Err(Status::internal("block is not available yet")), None => Err(Status::internal("block is not available yet")),
} }
} }
}
fn clean_btree_slots<T>(storage: &mut BTreeMap<u64, T>, keep_slot: u64) { async fn is_blockhash_valid(
while let Some(slot) = storage.keys().next().cloned() { &self,
if slot < keep_slot { blockhash: &str,
storage.remove(&slot); commitment: Option<i32>,
} else { ) -> Result<Response<IsBlockhashValidResponse>, Status> {
break; let commitment = Self::parse_commitment(commitment)?;
let storage = self.inner.read().await;
if storage.blockhashes.len() < MAX_RECENT_BLOCKHASHES + 32 {
return Err(Status::internal("startup"));
} }
let slot = match commitment {
CommitmentLevel::Processed => storage.processed,
CommitmentLevel::Confirmed => storage.confirmed,
CommitmentLevel::Finalized => storage.finalized,
}
.ok_or_else(|| Status::internal("startup"))?;
let valid = storage
.blockhashes
.get(blockhash)
.map(|status| match commitment {
CommitmentLevel::Processed => status.processed,
CommitmentLevel::Confirmed => status.confirmed,
CommitmentLevel::Finalized => status.finalized,
})
.unwrap_or(false);
Ok(Response::new(IsBlockhashValidResponse { valid, slot }))
} }
} }
@ -507,7 +585,13 @@ impl GrpcService {
} }
if slot.status == CommitmentLevel::Finalized { if slot.status == CommitmentLevel::Finalized {
clean_btree_slots(&mut messages, slot.slot - KEEP_SLOTS); let keep_slot = slot.slot - KEEP_SLOTS;
loop {
match messages.keys().next().cloned() {
Some(slot) if slot < keep_slot => messages.remove(&slot),
_ => break,
};
}
} }
}, },
Some(msg) = new_clients_rx.recv() => { Some(msg) = new_clients_rx.recv() => {
@ -672,6 +756,16 @@ impl Geyser for GrpcService {
.await .await
} }
async fn is_blockhash_valid(
&self,
request: Request<IsBlockhashValidRequest>,
) -> Result<Response<IsBlockhashValidResponse>, Status> {
let req = request.get_ref();
self.blocks_meta
.is_blockhash_valid(&req.blockhash, req.commitment)
.await
}
async fn get_version( async fn get_version(
&self, &self,
_request: Request<GetVersionRequest>, _request: Request<GetVersionRequest>,

View File

@ -12,6 +12,7 @@ service Geyser {
rpc GetLatestBlockhash(GetLatestBlockhashRequest) returns (GetLatestBlockhashResponse) {} rpc GetLatestBlockhash(GetLatestBlockhashRequest) returns (GetLatestBlockhashResponse) {}
rpc GetBlockHeight(GetBlockHeightRequest) returns (GetBlockHeightResponse) {} rpc GetBlockHeight(GetBlockHeightRequest) returns (GetBlockHeightResponse) {}
rpc GetSlot(GetSlotRequest) returns (GetSlotResponse) {} rpc GetSlot(GetSlotRequest) returns (GetSlotResponse) {}
rpc isBlockhashValid(IsBlockhashValidRequest) returns (IsBlockhashValidResponse) {}
rpc GetVersion(GetVersionRequest) returns (GetVersionResponse) {} rpc GetVersion(GetVersionRequest) returns (GetVersionResponse) {}
} }
@ -156,7 +157,6 @@ message GetLatestBlockhashRequest {
} }
message GetLatestBlockhashResponse { message GetLatestBlockhashResponse {
// The latest blockhash
uint64 slot = 1; uint64 slot = 1;
string blockhash = 2; string blockhash = 2;
uint64 last_valid_block_height = 3; uint64 last_valid_block_height = 3;
@ -183,3 +183,13 @@ message GetVersionRequest {}
message GetVersionResponse { message GetVersionResponse {
string version = 1; string version = 1;
} }
message IsBlockhashValidRequest {
string blockhash = 1;
optional CommitmentLevel commitment = 2;
}
message IsBlockhashValidResponse {
uint64 slot = 1;
bool valid = 2;
}