diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index 66d6485..c6bec9d 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -1,6 +1,6 @@ use { backoff::{future::retry, ExponentialBackoff}, - clap::{Parser, Subcommand}, + clap::{Parser, Subcommand, ValueEnum}, futures::{sink::SinkExt, stream::StreamExt}, log::{error, info}, solana_sdk::pubkey::Pubkey, @@ -10,11 +10,11 @@ use { prelude::{ subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterAccounts, - SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, - SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, - SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, - SubscribeUpdateAccount, + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, + SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, + SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, SubscribeUpdateAccount, }, tonic::service::Interceptor, }, @@ -36,10 +36,38 @@ struct Args { #[clap(long)] x_token: Option, + /// Commitment level: processed, confirmed or finalized + #[clap(long)] + commitment: Option, + #[command(subcommand)] action: Action, } +impl Args { + fn get_commitment(&self) -> Option { + Some(self.commitment.unwrap_or_default().into()) + } +} + +#[derive(Debug, Clone, Copy, Default, ValueEnum)] +enum ArgsCommitment { + Processed, + Confirmed, + #[default] + Finalized, +} + +impl From for CommitmentLevel { + fn from(commitment: ArgsCommitment) -> Self { + match commitment { + ArgsCommitment::Processed => CommitmentLevel::Processed, + ArgsCommitment::Confirmed => CommitmentLevel::Confirmed, + ArgsCommitment::Finalized => CommitmentLevel::Finalized, + } + } +} + #[derive(Debug, Clone, Subcommand)] enum Action { Subscribe(Box), @@ -120,7 +148,10 @@ struct ActionSubscribe { } impl Action { - fn get_subscribe_request(&self) -> anyhow::Result> { + fn get_subscribe_request( + &self, + commitment: Option, + ) -> anyhow::Result> { Ok(match self { Self::Subscribe(args) => { let mut accounts: AccountFilterMap = HashMap::new(); @@ -198,6 +229,7 @@ impl Action { transactions, blocks, blocks_meta, + commitment: commitment.map(|x| x as i32), }, args.resub.unwrap_or(0), )) @@ -264,6 +296,8 @@ async fn main() -> anyhow::Result<()> { async move { info!("Retry to connect to the server"); + + let commitment = args.get_commitment(); let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None) .map_err(|e| backoff::Error::transient(anyhow::Error::new(e)))?; @@ -271,7 +305,7 @@ async fn main() -> anyhow::Result<()> { Action::Subscribe(_) => { let (request, resub) = args .action - .get_subscribe_request() + .get_subscribe_request(commitment) .map_err(backoff::Error::Permanent)? .expect("expect subscribe action"); @@ -283,17 +317,17 @@ async fn main() -> anyhow::Result<()> { .map_err(anyhow::Error::new) .map(|response| info!("response: {:?}", response)), Action::GetLatestBlockhash => client - .get_latest_blockhash() + .get_latest_blockhash(commitment) .await .map_err(anyhow::Error::new) .map(|response| info!("response: {:?}", response)), Action::GetBlockHeight => client - .get_block_height() + .get_block_height(commitment) .await .map_err(anyhow::Error::new) .map(|response| info!("response: {:?}", response)), Action::GetSlot => client - .get_slot() + .get_slot(commitment) .await .map_err(anyhow::Error::new) .map(|response| info!("response: {:?}", response)), @@ -353,6 +387,7 @@ async fn geyser_subscribe( transactions: HashMap::default(), blocks: HashMap::default(), blocks_meta: HashMap::default(), + commitment: None, }) .await .map_err(GeyserGrpcClientError::SubscribeSendError)?; diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index 8592c20..42b0f60 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -14,15 +14,13 @@ use { transport::channel::{Channel, ClientTlsConfig}, Request, Response, Status, }, - yellowstone_grpc_proto::{ - geyser::{GetSlotResponse, PongResponse}, - prelude::{ - geyser_client::GeyserClient, GetBlockHeightRequest, GetBlockHeightResponse, - GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, PingRequest, - SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, - SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, SubscribeUpdate, - }, + yellowstone_grpc_proto::prelude::{ + geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest, + GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, + GetSlotRequest, GetSlotResponse, PingRequest, PongResponse, SubscribeRequest, + SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, SubscribeUpdate, }, }; @@ -109,6 +107,7 @@ impl GeyserGrpcClient { transactions: HashMap, blocks: HashMap, blocks_meta: HashMap, + commitment: Option, ) -> GeyserGrpcClientResult>> { let (mut subscribe_tx, response) = self.subscribe().await?; subscribe_tx @@ -118,6 +117,7 @@ impl GeyserGrpcClient { transactions, blocks, blocks_meta, + commitment: commitment.map(|value| value as i32), }) .await?; Ok(response) @@ -132,20 +132,33 @@ impl GeyserGrpcClient { pub async fn get_latest_blockhash( &mut self, + commitment: Option, ) -> GeyserGrpcClientResult { - let request = tonic::Request::new(GetLatestBlockhashRequest {}); + let request = tonic::Request::new(GetLatestBlockhashRequest { + commitment: commitment.map(|value| value as i32), + }); let response = self.client.get_latest_blockhash(request).await?; Ok(response.into_inner()) } - pub async fn get_block_height(&mut self) -> GeyserGrpcClientResult { - let request = tonic::Request::new(GetBlockHeightRequest {}); + pub async fn get_block_height( + &mut self, + commitment: Option, + ) -> GeyserGrpcClientResult { + let request = tonic::Request::new(GetBlockHeightRequest { + commitment: commitment.map(|value| value as i32), + }); let response = self.client.get_block_height(request).await?; Ok(response.into_inner()) } - pub async fn get_slot(&mut self) -> GeyserGrpcClientResult { - let request = tonic::Request::new(GetSlotRequest {}); + pub async fn get_slot( + &mut self, + commitment: Option, + ) -> GeyserGrpcClientResult { + let request = tonic::Request::new(GetSlotRequest { + commitment: commitment.map(|value| value as i32), + }); let response = self.client.get_slot(request).await?; Ok(response.into_inner()) } diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index cce635c..d886fb8 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -11,9 +11,10 @@ use { proto::{ subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, - SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, - SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, + CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, SubscribeUpdate, }, }, base64::{engine::general_purpose::STANDARD as base64_engine, Engine}, @@ -32,6 +33,7 @@ pub struct Filter { transactions: FilterTransactions, blocks: FilterBlocks, blocks_meta: FilterBlocksMeta, + commitment: CommitmentLevel, } impl Filter { @@ -42,9 +44,16 @@ impl Filter { transactions: FilterTransactions::new(&config.transactions, &limit.transactions)?, blocks: FilterBlocks::new(&config.blocks, &limit.blocks)?, blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta)?, + commitment: Self::decode_commitment(config.commitment)?, }) } + fn decode_commitment(commitment: Option) -> anyhow::Result { + let commitment = commitment.unwrap_or(CommitmentLevel::Finalized as i32); + CommitmentLevel::from_i32(commitment) + .ok_or_else(|| anyhow::anyhow!("failed to create CommitmentLevel from {commitment:?}")) + } + fn decode_pubkeys>( pubkeys: &[String], limit: &HashSet, @@ -70,6 +79,26 @@ impl Filter { Message::BlockMeta(message) => self.blocks_meta.get_filters(message), } } + + pub fn get_update( + &self, + message: &Message, + commitment: CommitmentLevel, + ) -> Option { + if commitment == self.commitment || matches!(message, Message::Slot(_)) { + let filters = self.get_filters(message); + if filters.is_empty() { + None + } else { + Some(SubscribeUpdate { + filters, + update_oneof: Some(message.into()), + }) + } + } else { + None + } + } } #[derive(Debug, Default)] diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 01d88f8..ade580a 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -7,15 +7,13 @@ use { self, geyser_server::{Geyser, GeyserServer}, subscribe_update::UpdateOneof, - SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, - SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdatePing, - SubscribeUpdateSlot, SubscribeUpdateSlotStatus, SubscribeUpdateTransaction, + CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, + GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, + PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, + SubscribeUpdateAccountInfo, SubscribeUpdateBlock, SubscribeUpdateBlockMeta, + SubscribeUpdatePing, SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, }, - proto::{ - GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, - GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, PingRequest, PongResponse, - }, }, log::*, solana_geyser_plugin_interface::geyser_plugin_interface::{ @@ -27,7 +25,7 @@ use { }, solana_transaction_status::{Reward, TransactionStatusMeta}, std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::atomic::{AtomicUsize, Ordering}, sync::Arc, time::Duration, @@ -45,7 +43,7 @@ use { tonic_health::server::health_reporter, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MessageAccountInfo { pub pubkey: Pubkey, pub lamports: u64, @@ -57,7 +55,7 @@ pub struct MessageAccountInfo { pub txn_signature: Option, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MessageAccount { pub account: MessageAccountInfo, pub slot: u64, @@ -83,11 +81,11 @@ impl<'a> From<(&'a ReplicaAccountInfoV2<'a>, u64, bool)> for MessageAccount { } } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct MessageSlot { pub slot: u64, pub parent: Option, - pub status: SubscribeUpdateSlotStatus, + pub status: CommitmentLevel, } impl From<(u64, Option, SlotStatus)> for MessageSlot { @@ -96,9 +94,9 @@ impl From<(u64, Option, SlotStatus)> for MessageSlot { slot, parent, status: match status { - SlotStatus::Processed => SubscribeUpdateSlotStatus::Processed, - SlotStatus::Confirmed => SubscribeUpdateSlotStatus::Confirmed, - SlotStatus::Rooted => SubscribeUpdateSlotStatus::Finalized, + SlotStatus::Processed => CommitmentLevel::Processed, + SlotStatus::Confirmed => CommitmentLevel::Confirmed, + SlotStatus::Rooted => CommitmentLevel::Finalized, }, } } @@ -125,7 +123,7 @@ impl From<&MessageTransactionInfo> for SubscribeUpdateTransactionInfo { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MessageTransaction { pub transaction: MessageTransactionInfo, pub slot: u64, @@ -146,7 +144,7 @@ impl<'a> From<(&'a ReplicaTransactionInfoV2<'a>, u64)> for MessageTransaction { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MessageBlock { pub parent_slot: u64, pub slot: u64, @@ -200,7 +198,7 @@ impl<'a> From<&'a ReplicaBlockInfoV2<'a>> for MessageBlockMeta { } } -#[derive(Debug)] +#[derive(Debug, Clone)] #[allow(clippy::large_enum_variant)] pub enum Message { Slot(MessageSlot), @@ -264,6 +262,18 @@ impl From<&Message> for UpdateOneof { } } +impl Message { + pub fn get_slot(&self) -> u64 { + match self { + Self::Slot(msg) => msg.slot, + Self::Account(msg) => msg.slot, + Self::Transaction(msg) => msg.slot, + Self::Block(msg) => msg.slot, + Self::BlockMeta(msg) => msg.slot, + } + } +} + #[derive(Debug)] enum ClientMessage { New { @@ -286,18 +296,108 @@ struct ClientConnection { stream_tx: mpsc::Sender>, } +#[derive(Debug, Default)] +struct BlockMetaStorageInner { + blocks: BTreeMap, + processed: Option, + confirmed: Option, + finalized: Option, +} + +#[derive(Debug)] +struct BlockMetaStorage { + inner: Arc>, +} + +impl BlockMetaStorage { + fn new() -> (Self, mpsc::UnboundedSender) { + let inner = Arc::new(RwLock::new(BlockMetaStorageInner::default())); + let (tx, mut rx) = mpsc::unbounded_channel(); + + let storage = Arc::clone(&inner); + tokio::spawn(async move { + const KEEP_SLOTS: u64 = 3; + + while let Some(message) = rx.recv().await { + let mut storage = storage.write().await; + match message { + Message::Slot(msg) => { + match msg.status { + CommitmentLevel::Processed => &mut storage.processed, + CommitmentLevel::Confirmed => &mut storage.confirmed, + CommitmentLevel::Finalized => &mut storage.finalized, + } + .replace(msg.slot); + + if msg.status == CommitmentLevel::Finalized { + clean_btree_slots(&mut storage.blocks, msg.slot - KEEP_SLOTS); + } + } + Message::BlockMeta(msg) => { + storage.blocks.insert(msg.slot, msg); + } + msg => { + error!("invalid message in BlockMetaStorage: {msg:?}"); + } + } + } + }); + + (Self { inner }, tx) + } + + async fn get_block( + &self, + handler: F, + commitment: Option, + ) -> Result, Status> + where + F: FnOnce(&MessageBlockMeta) -> Option, + { + let commitment = commitment.unwrap_or(CommitmentLevel::Finalized as i32); + let commitment = CommitmentLevel::from_i32(commitment).ok_or_else(|| { + let msg = format!("failed to create CommitmentLevel from {commitment:?}"); + Status::unknown(msg) + })?; + + let storage = self.inner.read().await; + + let slot = match commitment { + CommitmentLevel::Processed => storage.processed, + CommitmentLevel::Confirmed => storage.confirmed, + CommitmentLevel::Finalized => storage.finalized, + }; + match slot.and_then(|slot| storage.blocks.get(&slot)) { + Some(block) => match handler(block) { + Some(resp) => Ok(Response::new(resp)), + None => Err(Status::internal("failed to build response")), + }, + None => Err(Status::internal("block is not available yet")), + } + } +} + +fn clean_btree_slots(storage: &mut BTreeMap, keep_slot: u64) { + while let Some(slot) = storage.keys().next().cloned() { + if slot < keep_slot { + storage.remove(&slot); + } else { + break; + } + } +} + #[derive(Debug)] pub struct GrpcService { config: ConfigGrpc, subscribe_id: AtomicUsize, new_clients_tx: mpsc::UnboundedSender, - latest_block_meta: Arc>>, + blocks_meta: BlockMetaStorage, } impl GrpcService { pub fn create( config: ConfigGrpc, - latest_block_meta: Arc>>, ) -> Result< (mpsc::UnboundedSender, oneshot::Sender<()>), Box, @@ -309,20 +409,25 @@ impl GrpcService { Some(Duration::from_secs(20)), // tcp_keepalive )?; + // Blocks meta storage + let (blocks_meta, update_blocks_meta_tx) = BlockMetaStorage::new(); + // Create Server let (new_clients_tx, new_clients_rx) = mpsc::unbounded_channel(); let service = GeyserServer::new(Self { config, subscribe_id: AtomicUsize::new(0), new_clients_tx, - latest_block_meta, + blocks_meta, }) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip); // Run filter and send loop let (update_channel_tx, update_channel_rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { Self::send_loop(update_channel_rx, new_clients_rx).await }); + tokio::spawn(async move { + Self::send_loop(update_channel_rx, new_clients_rx, update_blocks_meta_tx).await + }); // gRPC Health check service let (mut health_reporter, health_service) = health_reporter(); @@ -347,42 +452,61 @@ impl GrpcService { async fn send_loop( mut update_channel_rx: mpsc::UnboundedReceiver, mut new_clients_rx: mpsc::UnboundedReceiver, + update_blocks_meta_tx: mpsc::UnboundedSender, ) { + // Number of slots hold in memory after finalized + const KEEP_SLOTS: u64 = 3; + let mut clients: HashMap = HashMap::new(); + let mut messages: BTreeMap> = BTreeMap::new(); loop { tokio::select! { Some(message) = update_channel_rx.recv() => { - let mut ids_full = vec![]; - let mut ids_closed = vec![]; + if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) { + let _ = update_blocks_meta_tx.send(message.clone()); + } - for (id, client) in clients.iter() { - let filters = client.filter.get_filters(&message); - if !filters.is_empty() { - match client.stream_tx.try_send(Ok(SubscribeUpdate { - filters, - update_oneof: Some((&message).into()), - })) { - Ok(()) => {}, - Err(mpsc::error::TrySendError::Full(_)) => ids_full.push(*id), - Err(mpsc::error::TrySendError::Closed(_)) => ids_closed.push(*id), + let slot = if let Message::Slot(slot) = message { + slot + } else { + messages.entry(message.get_slot()).or_default().push(message); + continue; + }; + + let slot_messages = messages.get(&slot.slot).map(|x| x.as_slice()).unwrap_or_default(); + for message in slot_messages.iter().chain(std::iter::once(&message)) { + let mut ids_full = vec![]; + let mut ids_closed = vec![]; + + for (id, client) in clients.iter() { + if let Some(msg) = client.filter.get_update(message, slot.status) { + match client.stream_tx.try_send(Ok(msg)) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => ids_full.push(*id), + Err(mpsc::error::TrySendError::Closed(_)) => ids_closed.push(*id), + } + } + } + + for id in ids_full { + if let Some(client) = clients.remove(&id) { + tokio::spawn(async move { + CONNECTIONS_TOTAL.dec(); + error!("{}, lagged, close stream", id); + let _ = client.stream_tx.send(Err(Status::internal("lagged"))).await; + }); + } + } + for id in ids_closed { + if let Some(_client) = clients.remove(&id) { + CONNECTIONS_TOTAL.dec(); + error!("{}, client closed stream", id); } } } - for id in ids_full { - if let Some(client) = clients.remove(&id) { - tokio::spawn(async move { - CONNECTIONS_TOTAL.dec(); - error!("{}, lagged, close stream", id); - let _ = client.stream_tx.send(Err(Status::internal("lagged"))).await; - }); - } - } - for id in ids_closed { - if let Some(_client) = clients.remove(&id) { - CONNECTIONS_TOTAL.dec(); - error!("{}, client closed stream", id); - } + if slot.status == CommitmentLevel::Finalized { + clean_btree_slots(&mut messages, slot.slot - KEEP_SLOTS); } }, Some(msg) = new_clients_rx.recv() => { @@ -429,6 +553,7 @@ impl Geyser for GrpcService { transactions: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }, &self.config.filters, ) @@ -493,49 +618,56 @@ impl Geyser for GrpcService { } async fn ping(&self, request: Request) -> Result, Status> { - info!("Got a request from {:?}", request.remote_addr()); - let count = request.get_ref().count; - let response = PongResponse { count: count + 1 }; Ok(Response::new(response)) } async fn get_latest_blockhash( &self, - _request: Request, + request: Request, ) -> Result, Status> { - match self.latest_block_meta.read().await.as_ref() { - Some(block_meta) => Ok(Response::new(GetLatestBlockhashResponse { - slot: block_meta.slot, - blockhash: block_meta.blockhash.clone(), - last_valid_block_height: block_meta.block_height.unwrap(), - })), - None => Err(Status::internal("block_meta is not available yet")), - } + self.blocks_meta + .get_block( + |block| { + block + .block_height + .map(|last_valid_block_height| GetLatestBlockhashResponse { + slot: block.slot, + blockhash: block.blockhash.clone(), + last_valid_block_height, + }) + }, + request.get_ref().commitment, + ) + .await } async fn get_block_height( &self, - _request: Request, + request: Request, ) -> Result, Status> { - match self.latest_block_meta.read().await.as_ref() { - Some(block_meta) => Ok(Response::new(GetBlockHeightResponse { - block_height: block_meta.block_height.unwrap(), - })), - None => Err(Status::internal("block_meta is not available yet")), - } + self.blocks_meta + .get_block( + |block| { + block + .block_height + .map(|block_height| GetBlockHeightResponse { block_height }) + }, + request.get_ref().commitment, + ) + .await } async fn get_slot( &self, - _request: Request, + request: Request, ) -> Result, Status> { - match self.latest_block_meta.read().await.as_ref() { - Some(block_meta) => Ok(Response::new(GetSlotResponse { - slot: block_meta.slot, - })), - None => Err(Status::internal("block_meta is not available yet")), - } + self.blocks_meta + .get_block( + |block| Some(GetSlotResponse { slot: block.slot }), + request.get_ref().commitment, + ) + .await } } diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 0d34430..9e62453 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -11,10 +11,10 @@ use { GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, }, - std::{collections::BTreeMap, sync::Arc, time::Duration}, + std::{collections::BTreeMap, time::Duration}, tokio::{ runtime::Runtime, - sync::{mpsc, oneshot, RwLock}, + sync::{mpsc, oneshot}, }, }; @@ -27,7 +27,6 @@ pub struct PluginInner { grpc_shutdown_tx: oneshot::Sender<()>, prometheus: PrometheusService, transactions: BTreeMap, Vec)>, - latest_block_meta_tx: mpsc::UnboundedSender, } impl PluginInner { @@ -78,21 +77,9 @@ impl GeyserPlugin for Plugin { // Create inner let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; - let (latest_block_meta_tx, mut latest_block_meta_rx) = mpsc::unbounded_channel(); - let latest_block_meta = Arc::new(RwLock::new(None)); - - let latest_block_meta2 = latest_block_meta.clone(); - runtime.spawn(async move { - while let Some(block_meta) = latest_block_meta_rx.recv().await { - let mut locked = latest_block_meta2.write().await; - *locked = Some(block_meta); - } - }); - let (grpc_channel, grpc_shutdown_tx, prometheus) = runtime.block_on(async move { - let (grpc_channel, grpc_shutdown_tx) = - GrpcService::create(config.grpc, latest_block_meta) - .map_err(|error| GeyserPluginError::Custom(error))?; + let (grpc_channel, grpc_shutdown_tx) = GrpcService::create(config.grpc) + .map_err(|error| GeyserPluginError::Custom(error))?; let prometheus = PrometheusService::new(config.prometheus) .map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; Ok::<_, GeyserPluginError>((grpc_channel, grpc_shutdown_tx, prometheus)) @@ -106,7 +93,6 @@ impl GeyserPlugin for Plugin { grpc_shutdown_tx, prometheus, transactions: BTreeMap::new(), - latest_block_meta_tx, }); Ok(()) @@ -235,9 +221,6 @@ impl GeyserPlugin for Plugin { inner.transactions.entry(block_meta.slot).or_default().0 = Some(block_meta.clone()); inner.try_send_full_block(block_meta.slot); - // Save newest block meta - let _ = inner.latest_block_meta_tx.send(block_meta.clone()); - let message = Message::BlockMeta(block_meta); let _ = inner.grpc_channel.send(message); diff --git a/yellowstone-grpc-geyser/tests/test_filters.rs b/yellowstone-grpc-geyser/tests/test_filters.rs index 8a94b5b..91d7150 100644 --- a/yellowstone-grpc-geyser/tests/test_filters.rs +++ b/yellowstone-grpc-geyser/tests/test_filters.rs @@ -72,6 +72,7 @@ mod tests { transactions: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let limit = ConfigGrpcFilters::default(); let filter = Filter::new(&config, &limit); @@ -97,6 +98,7 @@ mod tests { transactions: HashMap::new(), blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let mut limit = ConfigGrpcFilters::default(); limit.accounts.any = false; @@ -127,6 +129,7 @@ mod tests { transactions, blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let mut limit = ConfigGrpcFilters::default(); limit.transactions.any = false; @@ -156,6 +159,7 @@ mod tests { transactions, blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let mut limit = ConfigGrpcFilters::default(); limit.transactions.any = false; @@ -191,6 +195,7 @@ mod tests { transactions, blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let limit = ConfigGrpcFilters::default(); let filter = Filter::new(&config, &limit).unwrap(); @@ -229,6 +234,7 @@ mod tests { transactions, blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let limit = ConfigGrpcFilters::default(); let filter = Filter::new(&config, &limit).unwrap(); @@ -267,6 +273,7 @@ mod tests { transactions, blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let limit = ConfigGrpcFilters::default(); let filter = Filter::new(&config, &limit).unwrap(); @@ -311,6 +318,7 @@ mod tests { transactions, blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let limit = ConfigGrpcFilters::default(); let filter = Filter::new(&config, &limit).unwrap(); @@ -357,6 +365,7 @@ mod tests { transactions, blocks: HashMap::new(), blocks_meta: HashMap::new(), + commitment: None, }; let limit = ConfigGrpcFilters::default(); let filter = Filter::new(&config, &limit).unwrap(); diff --git a/yellowstone-grpc-proto/proto/geyser.proto b/yellowstone-grpc-proto/proto/geyser.proto index a9dc6e0..81cab49 100644 --- a/yellowstone-grpc-proto/proto/geyser.proto +++ b/yellowstone-grpc-proto/proto/geyser.proto @@ -14,12 +14,19 @@ service Geyser { rpc GetSlot(GetSlotRequest) returns (GetSlotResponse) {} } +enum CommitmentLevel { + PROCESSED = 0; + CONFIRMED = 1; + FINALIZED = 2; +} + message SubscribeRequest { map accounts = 1; map slots = 2; map transactions = 3; map blocks = 4; map blocks_meta = 5; + optional CommitmentLevel commitment = 6; } message SubscribeRequestFilterAccounts { @@ -91,13 +98,7 @@ message SubscribeUpdateAccountInfo { message SubscribeUpdateSlot { uint64 slot = 1; optional uint64 parent = 2; - SubscribeUpdateSlotStatus status = 3; -} - -enum SubscribeUpdateSlotStatus { - PROCESSED = 0; - CONFIRMED = 1; - FINALIZED = 2; + CommitmentLevel status = 3; } message SubscribeUpdateTransaction { @@ -149,20 +150,29 @@ message PongResponse { int32 count = 1; } -message GetLatestBlockhashRequest {} +message GetLatestBlockhashRequest { + optional CommitmentLevel commitment = 1; +} + message GetLatestBlockhashResponse { // The latest blockhash uint64 slot = 1; string blockhash = 2; - uint64 lastValidBlockHeight = 3; + uint64 last_valid_block_height = 3; +} + +message GetBlockHeightRequest { + optional CommitmentLevel commitment = 1; } -message GetBlockHeightRequest {} message GetBlockHeightResponse { - uint64 BlockHeight = 1; + uint64 block_height = 1; +} + +message GetSlotRequest { + optional CommitmentLevel commitment = 1; } -message GetSlotRequest {} message GetSlotResponse { uint64 slot = 1; }