grpc, proto: add commitment level (#128)

This commit is contained in:
Kirill Fomichev 2023-05-25 11:31:08 -04:00 committed by GitHub
parent 8ee39a04b5
commit 968dce9463
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 346 additions and 135 deletions

View File

@ -1,6 +1,6 @@
use { use {
backoff::{future::retry, ExponentialBackoff}, backoff::{future::retry, ExponentialBackoff},
clap::{Parser, Subcommand}, clap::{Parser, Subcommand, ValueEnum},
futures::{sink::SinkExt, stream::StreamExt}, futures::{sink::SinkExt, stream::StreamExt},
log::{error, info}, log::{error, info},
solana_sdk::pubkey::Pubkey, solana_sdk::pubkey::Pubkey,
@ -10,11 +10,11 @@ use {
prelude::{ prelude::{
subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof,
subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof,
subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterAccounts, subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeUpdateAccount, SubscribeRequestFilterTransactions, SubscribeUpdateAccount,
}, },
tonic::service::Interceptor, tonic::service::Interceptor,
}, },
@ -36,10 +36,38 @@ struct Args {
#[clap(long)] #[clap(long)]
x_token: Option<String>, x_token: Option<String>,
/// Commitment level: processed, confirmed or finalized
#[clap(long)]
commitment: Option<ArgsCommitment>,
#[command(subcommand)] #[command(subcommand)]
action: Action, action: Action,
} }
impl Args {
fn get_commitment(&self) -> Option<CommitmentLevel> {
Some(self.commitment.unwrap_or_default().into())
}
}
#[derive(Debug, Clone, Copy, Default, ValueEnum)]
enum ArgsCommitment {
Processed,
Confirmed,
#[default]
Finalized,
}
impl From<ArgsCommitment> for CommitmentLevel {
fn from(commitment: ArgsCommitment) -> Self {
match commitment {
ArgsCommitment::Processed => CommitmentLevel::Processed,
ArgsCommitment::Confirmed => CommitmentLevel::Confirmed,
ArgsCommitment::Finalized => CommitmentLevel::Finalized,
}
}
}
#[derive(Debug, Clone, Subcommand)] #[derive(Debug, Clone, Subcommand)]
enum Action { enum Action {
Subscribe(Box<ActionSubscribe>), Subscribe(Box<ActionSubscribe>),
@ -120,7 +148,10 @@ struct ActionSubscribe {
} }
impl Action { impl Action {
fn get_subscribe_request(&self) -> anyhow::Result<Option<(SubscribeRequest, usize)>> { fn get_subscribe_request(
&self,
commitment: Option<CommitmentLevel>,
) -> anyhow::Result<Option<(SubscribeRequest, usize)>> {
Ok(match self { Ok(match self {
Self::Subscribe(args) => { Self::Subscribe(args) => {
let mut accounts: AccountFilterMap = HashMap::new(); let mut accounts: AccountFilterMap = HashMap::new();
@ -198,6 +229,7 @@ impl Action {
transactions, transactions,
blocks, blocks,
blocks_meta, blocks_meta,
commitment: commitment.map(|x| x as i32),
}, },
args.resub.unwrap_or(0), args.resub.unwrap_or(0),
)) ))
@ -264,6 +296,8 @@ async fn main() -> anyhow::Result<()> {
async move { async move {
info!("Retry to connect to the server"); info!("Retry to connect to the server");
let commitment = args.get_commitment();
let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None) let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None)
.map_err(|e| backoff::Error::transient(anyhow::Error::new(e)))?; .map_err(|e| backoff::Error::transient(anyhow::Error::new(e)))?;
@ -271,7 +305,7 @@ async fn main() -> anyhow::Result<()> {
Action::Subscribe(_) => { Action::Subscribe(_) => {
let (request, resub) = args let (request, resub) = args
.action .action
.get_subscribe_request() .get_subscribe_request(commitment)
.map_err(backoff::Error::Permanent)? .map_err(backoff::Error::Permanent)?
.expect("expect subscribe action"); .expect("expect subscribe action");
@ -283,17 +317,17 @@ async fn main() -> anyhow::Result<()> {
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {:?}", response)),
Action::GetLatestBlockhash => client Action::GetLatestBlockhash => client
.get_latest_blockhash() .get_latest_blockhash(commitment)
.await .await
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {:?}", response)),
Action::GetBlockHeight => client Action::GetBlockHeight => client
.get_block_height() .get_block_height(commitment)
.await .await
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {:?}", response)),
Action::GetSlot => client Action::GetSlot => client
.get_slot() .get_slot(commitment)
.await .await
.map_err(anyhow::Error::new) .map_err(anyhow::Error::new)
.map(|response| info!("response: {:?}", response)), .map(|response| info!("response: {:?}", response)),
@ -353,6 +387,7 @@ async fn geyser_subscribe(
transactions: HashMap::default(), transactions: HashMap::default(),
blocks: HashMap::default(), blocks: HashMap::default(),
blocks_meta: HashMap::default(), blocks_meta: HashMap::default(),
commitment: None,
}) })
.await .await
.map_err(GeyserGrpcClientError::SubscribeSendError)?; .map_err(GeyserGrpcClientError::SubscribeSendError)?;

View File

@ -14,15 +14,13 @@ use {
transport::channel::{Channel, ClientTlsConfig}, transport::channel::{Channel, ClientTlsConfig},
Request, Response, Status, Request, Response, Status,
}, },
yellowstone_grpc_proto::{ yellowstone_grpc_proto::prelude::{
geyser::{GetSlotResponse, PongResponse}, geyser_client::GeyserClient, CommitmentLevel, GetBlockHeightRequest,
prelude::{ GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse,
geyser_client::GeyserClient, GetBlockHeightRequest, GetBlockHeightResponse, GetSlotRequest, GetSlotResponse, PingRequest, PongResponse, SubscribeRequest,
GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, PingRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdate,
SubscribeRequestFilterTransactions, SubscribeUpdate,
},
}, },
}; };
@ -109,6 +107,7 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
transactions: HashMap<String, SubscribeRequestFilterTransactions>, transactions: HashMap<String, SubscribeRequestFilterTransactions>,
blocks: HashMap<String, SubscribeRequestFilterBlocks>, blocks: HashMap<String, SubscribeRequestFilterBlocks>,
blocks_meta: HashMap<String, SubscribeRequestFilterBlocksMeta>, blocks_meta: HashMap<String, SubscribeRequestFilterBlocksMeta>,
commitment: Option<CommitmentLevel>,
) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> { ) -> GeyserGrpcClientResult<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
let (mut subscribe_tx, response) = self.subscribe().await?; let (mut subscribe_tx, response) = self.subscribe().await?;
subscribe_tx subscribe_tx
@ -118,6 +117,7 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
transactions, transactions,
blocks, blocks,
blocks_meta, blocks_meta,
commitment: commitment.map(|value| value as i32),
}) })
.await?; .await?;
Ok(response) Ok(response)
@ -132,20 +132,33 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
pub async fn get_latest_blockhash( pub async fn get_latest_blockhash(
&mut self, &mut self,
commitment: Option<CommitmentLevel>,
) -> GeyserGrpcClientResult<GetLatestBlockhashResponse> { ) -> GeyserGrpcClientResult<GetLatestBlockhashResponse> {
let request = tonic::Request::new(GetLatestBlockhashRequest {}); let request = tonic::Request::new(GetLatestBlockhashRequest {
commitment: commitment.map(|value| value as i32),
});
let response = self.client.get_latest_blockhash(request).await?; let response = self.client.get_latest_blockhash(request).await?;
Ok(response.into_inner()) Ok(response.into_inner())
} }
pub async fn get_block_height(&mut self) -> GeyserGrpcClientResult<GetBlockHeightResponse> { pub async fn get_block_height(
let request = tonic::Request::new(GetBlockHeightRequest {}); &mut self,
commitment: Option<CommitmentLevel>,
) -> GeyserGrpcClientResult<GetBlockHeightResponse> {
let request = tonic::Request::new(GetBlockHeightRequest {
commitment: commitment.map(|value| value as i32),
});
let response = self.client.get_block_height(request).await?; let response = self.client.get_block_height(request).await?;
Ok(response.into_inner()) Ok(response.into_inner())
} }
pub async fn get_slot(&mut self) -> GeyserGrpcClientResult<GetSlotResponse> { pub async fn get_slot(
let request = tonic::Request::new(GetSlotRequest {}); &mut self,
commitment: Option<CommitmentLevel>,
) -> GeyserGrpcClientResult<GetSlotResponse> {
let request = tonic::Request::new(GetSlotRequest {
commitment: commitment.map(|value| value as i32),
});
let response = self.client.get_slot(request).await?; let response = self.client.get_slot(request).await?;
Ok(response.into_inner()) Ok(response.into_inner())
} }

View File

@ -11,9 +11,10 @@ use {
proto::{ proto::{
subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof,
subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof,
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterBlocks,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions, SubscribeUpdate,
}, },
}, },
base64::{engine::general_purpose::STANDARD as base64_engine, Engine}, base64::{engine::general_purpose::STANDARD as base64_engine, Engine},
@ -32,6 +33,7 @@ pub struct Filter {
transactions: FilterTransactions, transactions: FilterTransactions,
blocks: FilterBlocks, blocks: FilterBlocks,
blocks_meta: FilterBlocksMeta, blocks_meta: FilterBlocksMeta,
commitment: CommitmentLevel,
} }
impl Filter { impl Filter {
@ -42,9 +44,16 @@ impl Filter {
transactions: FilterTransactions::new(&config.transactions, &limit.transactions)?, transactions: FilterTransactions::new(&config.transactions, &limit.transactions)?,
blocks: FilterBlocks::new(&config.blocks, &limit.blocks)?, blocks: FilterBlocks::new(&config.blocks, &limit.blocks)?,
blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta)?, blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta)?,
commitment: Self::decode_commitment(config.commitment)?,
}) })
} }
fn decode_commitment(commitment: Option<i32>) -> anyhow::Result<CommitmentLevel> {
let commitment = commitment.unwrap_or(CommitmentLevel::Finalized as i32);
CommitmentLevel::from_i32(commitment)
.ok_or_else(|| anyhow::anyhow!("failed to create CommitmentLevel from {commitment:?}"))
}
fn decode_pubkeys<T: FromIterator<Pubkey>>( fn decode_pubkeys<T: FromIterator<Pubkey>>(
pubkeys: &[String], pubkeys: &[String],
limit: &HashSet<Pubkey>, limit: &HashSet<Pubkey>,
@ -70,6 +79,26 @@ impl Filter {
Message::BlockMeta(message) => self.blocks_meta.get_filters(message), Message::BlockMeta(message) => self.blocks_meta.get_filters(message),
} }
} }
pub fn get_update(
&self,
message: &Message,
commitment: CommitmentLevel,
) -> Option<SubscribeUpdate> {
if commitment == self.commitment || matches!(message, Message::Slot(_)) {
let filters = self.get_filters(message);
if filters.is_empty() {
None
} else {
Some(SubscribeUpdate {
filters,
update_oneof: Some(message.into()),
})
}
} else {
None
}
}
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]

View File

@ -7,15 +7,13 @@ use {
self, self,
geyser_server::{Geyser, GeyserServer}, geyser_server::{Geyser, GeyserServer},
subscribe_update::UpdateOneof, subscribe_update::UpdateOneof,
SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse,
SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdatePing, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse,
SubscribeUpdateSlot, SubscribeUpdateSlotStatus, SubscribeUpdateTransaction, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount,
SubscribeUpdateAccountInfo, SubscribeUpdateBlock, SubscribeUpdateBlockMeta,
SubscribeUpdatePing, SubscribeUpdateSlot, SubscribeUpdateTransaction,
SubscribeUpdateTransactionInfo, SubscribeUpdateTransactionInfo,
}, },
proto::{
GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest,
GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, PingRequest, PongResponse,
},
}, },
log::*, log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{ solana_geyser_plugin_interface::geyser_plugin_interface::{
@ -27,7 +25,7 @@ use {
}, },
solana_transaction_status::{Reward, TransactionStatusMeta}, solana_transaction_status::{Reward, TransactionStatusMeta},
std::{ std::{
collections::HashMap, collections::{BTreeMap, HashMap},
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
sync::Arc, sync::Arc,
time::Duration, time::Duration,
@ -45,7 +43,7 @@ use {
tonic_health::server::health_reporter, tonic_health::server::health_reporter,
}; };
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct MessageAccountInfo { pub struct MessageAccountInfo {
pub pubkey: Pubkey, pub pubkey: Pubkey,
pub lamports: u64, pub lamports: u64,
@ -57,7 +55,7 @@ pub struct MessageAccountInfo {
pub txn_signature: Option<Signature>, pub txn_signature: Option<Signature>,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct MessageAccount { pub struct MessageAccount {
pub account: MessageAccountInfo, pub account: MessageAccountInfo,
pub slot: u64, pub slot: u64,
@ -83,11 +81,11 @@ impl<'a> From<(&'a ReplicaAccountInfoV2<'a>, u64, bool)> for MessageAccount {
} }
} }
#[derive(Debug)] #[derive(Debug, Clone, Copy)]
pub struct MessageSlot { pub struct MessageSlot {
pub slot: u64, pub slot: u64,
pub parent: Option<u64>, pub parent: Option<u64>,
pub status: SubscribeUpdateSlotStatus, pub status: CommitmentLevel,
} }
impl From<(u64, Option<u64>, SlotStatus)> for MessageSlot { impl From<(u64, Option<u64>, SlotStatus)> for MessageSlot {
@ -96,9 +94,9 @@ impl From<(u64, Option<u64>, SlotStatus)> for MessageSlot {
slot, slot,
parent, parent,
status: match status { status: match status {
SlotStatus::Processed => SubscribeUpdateSlotStatus::Processed, SlotStatus::Processed => CommitmentLevel::Processed,
SlotStatus::Confirmed => SubscribeUpdateSlotStatus::Confirmed, SlotStatus::Confirmed => CommitmentLevel::Confirmed,
SlotStatus::Rooted => SubscribeUpdateSlotStatus::Finalized, SlotStatus::Rooted => CommitmentLevel::Finalized,
}, },
} }
} }
@ -125,7 +123,7 @@ impl From<&MessageTransactionInfo> for SubscribeUpdateTransactionInfo {
} }
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct MessageTransaction { pub struct MessageTransaction {
pub transaction: MessageTransactionInfo, pub transaction: MessageTransactionInfo,
pub slot: u64, pub slot: u64,
@ -146,7 +144,7 @@ impl<'a> From<(&'a ReplicaTransactionInfoV2<'a>, u64)> for MessageTransaction {
} }
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct MessageBlock { pub struct MessageBlock {
pub parent_slot: u64, pub parent_slot: u64,
pub slot: u64, pub slot: u64,
@ -200,7 +198,7 @@ impl<'a> From<&'a ReplicaBlockInfoV2<'a>> for MessageBlockMeta {
} }
} }
#[derive(Debug)] #[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum Message { pub enum Message {
Slot(MessageSlot), Slot(MessageSlot),
@ -264,6 +262,18 @@ impl From<&Message> for UpdateOneof {
} }
} }
impl Message {
pub fn get_slot(&self) -> u64 {
match self {
Self::Slot(msg) => msg.slot,
Self::Account(msg) => msg.slot,
Self::Transaction(msg) => msg.slot,
Self::Block(msg) => msg.slot,
Self::BlockMeta(msg) => msg.slot,
}
}
}
#[derive(Debug)] #[derive(Debug)]
enum ClientMessage { enum ClientMessage {
New { New {
@ -286,18 +296,108 @@ struct ClientConnection {
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>, stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
} }
#[derive(Debug, Default)]
struct BlockMetaStorageInner {
blocks: BTreeMap<u64, MessageBlockMeta>,
processed: Option<u64>,
confirmed: Option<u64>,
finalized: Option<u64>,
}
#[derive(Debug)]
struct BlockMetaStorage {
inner: Arc<RwLock<BlockMetaStorageInner>>,
}
impl BlockMetaStorage {
fn new() -> (Self, mpsc::UnboundedSender<Message>) {
let inner = Arc::new(RwLock::new(BlockMetaStorageInner::default()));
let (tx, mut rx) = mpsc::unbounded_channel();
let storage = Arc::clone(&inner);
tokio::spawn(async move {
const KEEP_SLOTS: u64 = 3;
while let Some(message) = rx.recv().await {
let mut storage = storage.write().await;
match message {
Message::Slot(msg) => {
match msg.status {
CommitmentLevel::Processed => &mut storage.processed,
CommitmentLevel::Confirmed => &mut storage.confirmed,
CommitmentLevel::Finalized => &mut storage.finalized,
}
.replace(msg.slot);
if msg.status == CommitmentLevel::Finalized {
clean_btree_slots(&mut storage.blocks, msg.slot - KEEP_SLOTS);
}
}
Message::BlockMeta(msg) => {
storage.blocks.insert(msg.slot, msg);
}
msg => {
error!("invalid message in BlockMetaStorage: {msg:?}");
}
}
}
});
(Self { inner }, tx)
}
async fn get_block<F, T>(
&self,
handler: F,
commitment: Option<i32>,
) -> Result<Response<T>, Status>
where
F: FnOnce(&MessageBlockMeta) -> Option<T>,
{
let commitment = commitment.unwrap_or(CommitmentLevel::Finalized as i32);
let commitment = CommitmentLevel::from_i32(commitment).ok_or_else(|| {
let msg = format!("failed to create CommitmentLevel from {commitment:?}");
Status::unknown(msg)
})?;
let storage = self.inner.read().await;
let slot = match commitment {
CommitmentLevel::Processed => storage.processed,
CommitmentLevel::Confirmed => storage.confirmed,
CommitmentLevel::Finalized => storage.finalized,
};
match slot.and_then(|slot| storage.blocks.get(&slot)) {
Some(block) => match handler(block) {
Some(resp) => Ok(Response::new(resp)),
None => Err(Status::internal("failed to build response")),
},
None => Err(Status::internal("block is not available yet")),
}
}
}
fn clean_btree_slots<T>(storage: &mut BTreeMap<u64, T>, keep_slot: u64) {
while let Some(slot) = storage.keys().next().cloned() {
if slot < keep_slot {
storage.remove(&slot);
} else {
break;
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct GrpcService { pub struct GrpcService {
config: ConfigGrpc, config: ConfigGrpc,
subscribe_id: AtomicUsize, subscribe_id: AtomicUsize,
new_clients_tx: mpsc::UnboundedSender<ClientMessage>, new_clients_tx: mpsc::UnboundedSender<ClientMessage>,
latest_block_meta: Arc<RwLock<Option<MessageBlockMeta>>>, blocks_meta: BlockMetaStorage,
} }
impl GrpcService { impl GrpcService {
pub fn create( pub fn create(
config: ConfigGrpc, config: ConfigGrpc,
latest_block_meta: Arc<RwLock<Option<MessageBlockMeta>>>,
) -> Result< ) -> Result<
(mpsc::UnboundedSender<Message>, oneshot::Sender<()>), (mpsc::UnboundedSender<Message>, oneshot::Sender<()>),
Box<dyn std::error::Error + Send + Sync>, Box<dyn std::error::Error + Send + Sync>,
@ -309,20 +409,25 @@ impl GrpcService {
Some(Duration::from_secs(20)), // tcp_keepalive Some(Duration::from_secs(20)), // tcp_keepalive
)?; )?;
// Blocks meta storage
let (blocks_meta, update_blocks_meta_tx) = BlockMetaStorage::new();
// Create Server // Create Server
let (new_clients_tx, new_clients_rx) = mpsc::unbounded_channel(); let (new_clients_tx, new_clients_rx) = mpsc::unbounded_channel();
let service = GeyserServer::new(Self { let service = GeyserServer::new(Self {
config, config,
subscribe_id: AtomicUsize::new(0), subscribe_id: AtomicUsize::new(0),
new_clients_tx, new_clients_tx,
latest_block_meta, blocks_meta,
}) })
.accept_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip); .send_compressed(CompressionEncoding::Gzip);
// Run filter and send loop // Run filter and send loop
let (update_channel_tx, update_channel_rx) = mpsc::unbounded_channel(); let (update_channel_tx, update_channel_rx) = mpsc::unbounded_channel();
tokio::spawn(async move { Self::send_loop(update_channel_rx, new_clients_rx).await }); tokio::spawn(async move {
Self::send_loop(update_channel_rx, new_clients_rx, update_blocks_meta_tx).await
});
// gRPC Health check service // gRPC Health check service
let (mut health_reporter, health_service) = health_reporter(); let (mut health_reporter, health_service) = health_reporter();
@ -347,42 +452,61 @@ impl GrpcService {
async fn send_loop( async fn send_loop(
mut update_channel_rx: mpsc::UnboundedReceiver<Message>, mut update_channel_rx: mpsc::UnboundedReceiver<Message>,
mut new_clients_rx: mpsc::UnboundedReceiver<ClientMessage>, mut new_clients_rx: mpsc::UnboundedReceiver<ClientMessage>,
update_blocks_meta_tx: mpsc::UnboundedSender<Message>,
) { ) {
// Number of slots hold in memory after finalized
const KEEP_SLOTS: u64 = 3;
let mut clients: HashMap<usize, ClientConnection> = HashMap::new(); let mut clients: HashMap<usize, ClientConnection> = HashMap::new();
let mut messages: BTreeMap<u64, Vec<Message>> = BTreeMap::new();
loop { loop {
tokio::select! { tokio::select! {
Some(message) = update_channel_rx.recv() => { Some(message) = update_channel_rx.recv() => {
let mut ids_full = vec![]; if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let mut ids_closed = vec![]; let _ = update_blocks_meta_tx.send(message.clone());
}
for (id, client) in clients.iter() { let slot = if let Message::Slot(slot) = message {
let filters = client.filter.get_filters(&message); slot
if !filters.is_empty() { } else {
match client.stream_tx.try_send(Ok(SubscribeUpdate { messages.entry(message.get_slot()).or_default().push(message);
filters, continue;
update_oneof: Some((&message).into()), };
})) {
Ok(()) => {}, let slot_messages = messages.get(&slot.slot).map(|x| x.as_slice()).unwrap_or_default();
Err(mpsc::error::TrySendError::Full(_)) => ids_full.push(*id), for message in slot_messages.iter().chain(std::iter::once(&message)) {
Err(mpsc::error::TrySendError::Closed(_)) => ids_closed.push(*id), let mut ids_full = vec![];
let mut ids_closed = vec![];
for (id, client) in clients.iter() {
if let Some(msg) = client.filter.get_update(message, slot.status) {
match client.stream_tx.try_send(Ok(msg)) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => ids_full.push(*id),
Err(mpsc::error::TrySendError::Closed(_)) => ids_closed.push(*id),
}
}
}
for id in ids_full {
if let Some(client) = clients.remove(&id) {
tokio::spawn(async move {
CONNECTIONS_TOTAL.dec();
error!("{}, lagged, close stream", id);
let _ = client.stream_tx.send(Err(Status::internal("lagged"))).await;
});
}
}
for id in ids_closed {
if let Some(_client) = clients.remove(&id) {
CONNECTIONS_TOTAL.dec();
error!("{}, client closed stream", id);
} }
} }
} }
for id in ids_full { if slot.status == CommitmentLevel::Finalized {
if let Some(client) = clients.remove(&id) { clean_btree_slots(&mut messages, slot.slot - KEEP_SLOTS);
tokio::spawn(async move {
CONNECTIONS_TOTAL.dec();
error!("{}, lagged, close stream", id);
let _ = client.stream_tx.send(Err(Status::internal("lagged"))).await;
});
}
}
for id in ids_closed {
if let Some(_client) = clients.remove(&id) {
CONNECTIONS_TOTAL.dec();
error!("{}, client closed stream", id);
}
} }
}, },
Some(msg) = new_clients_rx.recv() => { Some(msg) = new_clients_rx.recv() => {
@ -429,6 +553,7 @@ impl Geyser for GrpcService {
transactions: HashMap::new(), transactions: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}, },
&self.config.filters, &self.config.filters,
) )
@ -493,49 +618,56 @@ impl Geyser for GrpcService {
} }
async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PongResponse>, Status> { async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PongResponse>, Status> {
info!("Got a request from {:?}", request.remote_addr());
let count = request.get_ref().count; let count = request.get_ref().count;
let response = PongResponse { count: count + 1 }; let response = PongResponse { count: count + 1 };
Ok(Response::new(response)) Ok(Response::new(response))
} }
async fn get_latest_blockhash( async fn get_latest_blockhash(
&self, &self,
_request: Request<GetLatestBlockhashRequest>, request: Request<GetLatestBlockhashRequest>,
) -> Result<Response<GetLatestBlockhashResponse>, Status> { ) -> Result<Response<GetLatestBlockhashResponse>, Status> {
match self.latest_block_meta.read().await.as_ref() { self.blocks_meta
Some(block_meta) => Ok(Response::new(GetLatestBlockhashResponse { .get_block(
slot: block_meta.slot, |block| {
blockhash: block_meta.blockhash.clone(), block
last_valid_block_height: block_meta.block_height.unwrap(), .block_height
})), .map(|last_valid_block_height| GetLatestBlockhashResponse {
None => Err(Status::internal("block_meta is not available yet")), slot: block.slot,
} blockhash: block.blockhash.clone(),
last_valid_block_height,
})
},
request.get_ref().commitment,
)
.await
} }
async fn get_block_height( async fn get_block_height(
&self, &self,
_request: Request<GetBlockHeightRequest>, request: Request<GetBlockHeightRequest>,
) -> Result<Response<GetBlockHeightResponse>, Status> { ) -> Result<Response<GetBlockHeightResponse>, Status> {
match self.latest_block_meta.read().await.as_ref() { self.blocks_meta
Some(block_meta) => Ok(Response::new(GetBlockHeightResponse { .get_block(
block_height: block_meta.block_height.unwrap(), |block| {
})), block
None => Err(Status::internal("block_meta is not available yet")), .block_height
} .map(|block_height| GetBlockHeightResponse { block_height })
},
request.get_ref().commitment,
)
.await
} }
async fn get_slot( async fn get_slot(
&self, &self,
_request: Request<GetSlotRequest>, request: Request<GetSlotRequest>,
) -> Result<Response<GetSlotResponse>, Status> { ) -> Result<Response<GetSlotResponse>, Status> {
match self.latest_block_meta.read().await.as_ref() { self.blocks_meta
Some(block_meta) => Ok(Response::new(GetSlotResponse { .get_block(
slot: block_meta.slot, |block| Some(GetSlotResponse { slot: block.slot }),
})), request.get_ref().commitment,
None => Err(Status::internal("block_meta is not available yet")), )
} .await
} }
} }

View File

@ -11,10 +11,10 @@ use {
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
}, },
std::{collections::BTreeMap, sync::Arc, time::Duration}, std::{collections::BTreeMap, time::Duration},
tokio::{ tokio::{
runtime::Runtime, runtime::Runtime,
sync::{mpsc, oneshot, RwLock}, sync::{mpsc, oneshot},
}, },
}; };
@ -27,7 +27,6 @@ pub struct PluginInner {
grpc_shutdown_tx: oneshot::Sender<()>, grpc_shutdown_tx: oneshot::Sender<()>,
prometheus: PrometheusService, prometheus: PrometheusService,
transactions: BTreeMap<u64, (Option<MessageBlockMeta>, Vec<MessageTransactionInfo>)>, transactions: BTreeMap<u64, (Option<MessageBlockMeta>, Vec<MessageTransactionInfo>)>,
latest_block_meta_tx: mpsc::UnboundedSender<MessageBlockMeta>,
} }
impl PluginInner { impl PluginInner {
@ -78,21 +77,9 @@ impl GeyserPlugin for Plugin {
// Create inner // Create inner
let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let (latest_block_meta_tx, mut latest_block_meta_rx) = mpsc::unbounded_channel();
let latest_block_meta = Arc::new(RwLock::new(None));
let latest_block_meta2 = latest_block_meta.clone();
runtime.spawn(async move {
while let Some(block_meta) = latest_block_meta_rx.recv().await {
let mut locked = latest_block_meta2.write().await;
*locked = Some(block_meta);
}
});
let (grpc_channel, grpc_shutdown_tx, prometheus) = runtime.block_on(async move { let (grpc_channel, grpc_shutdown_tx, prometheus) = runtime.block_on(async move {
let (grpc_channel, grpc_shutdown_tx) = let (grpc_channel, grpc_shutdown_tx) = GrpcService::create(config.grpc)
GrpcService::create(config.grpc, latest_block_meta) .map_err(|error| GeyserPluginError::Custom(error))?;
.map_err(|error| GeyserPluginError::Custom(error))?;
let prometheus = PrometheusService::new(config.prometheus) let prometheus = PrometheusService::new(config.prometheus)
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; .map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
Ok::<_, GeyserPluginError>((grpc_channel, grpc_shutdown_tx, prometheus)) Ok::<_, GeyserPluginError>((grpc_channel, grpc_shutdown_tx, prometheus))
@ -106,7 +93,6 @@ impl GeyserPlugin for Plugin {
grpc_shutdown_tx, grpc_shutdown_tx,
prometheus, prometheus,
transactions: BTreeMap::new(), transactions: BTreeMap::new(),
latest_block_meta_tx,
}); });
Ok(()) Ok(())
@ -235,9 +221,6 @@ impl GeyserPlugin for Plugin {
inner.transactions.entry(block_meta.slot).or_default().0 = Some(block_meta.clone()); inner.transactions.entry(block_meta.slot).or_default().0 = Some(block_meta.clone());
inner.try_send_full_block(block_meta.slot); inner.try_send_full_block(block_meta.slot);
// Save newest block meta
let _ = inner.latest_block_meta_tx.send(block_meta.clone());
let message = Message::BlockMeta(block_meta); let message = Message::BlockMeta(block_meta);
let _ = inner.grpc_channel.send(message); let _ = inner.grpc_channel.send(message);

View File

@ -72,6 +72,7 @@ mod tests {
transactions: HashMap::new(), transactions: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let limit = ConfigGrpcFilters::default(); let limit = ConfigGrpcFilters::default();
let filter = Filter::new(&config, &limit); let filter = Filter::new(&config, &limit);
@ -97,6 +98,7 @@ mod tests {
transactions: HashMap::new(), transactions: HashMap::new(),
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let mut limit = ConfigGrpcFilters::default(); let mut limit = ConfigGrpcFilters::default();
limit.accounts.any = false; limit.accounts.any = false;
@ -127,6 +129,7 @@ mod tests {
transactions, transactions,
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let mut limit = ConfigGrpcFilters::default(); let mut limit = ConfigGrpcFilters::default();
limit.transactions.any = false; limit.transactions.any = false;
@ -156,6 +159,7 @@ mod tests {
transactions, transactions,
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let mut limit = ConfigGrpcFilters::default(); let mut limit = ConfigGrpcFilters::default();
limit.transactions.any = false; limit.transactions.any = false;
@ -191,6 +195,7 @@ mod tests {
transactions, transactions,
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let limit = ConfigGrpcFilters::default(); let limit = ConfigGrpcFilters::default();
let filter = Filter::new(&config, &limit).unwrap(); let filter = Filter::new(&config, &limit).unwrap();
@ -229,6 +234,7 @@ mod tests {
transactions, transactions,
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let limit = ConfigGrpcFilters::default(); let limit = ConfigGrpcFilters::default();
let filter = Filter::new(&config, &limit).unwrap(); let filter = Filter::new(&config, &limit).unwrap();
@ -267,6 +273,7 @@ mod tests {
transactions, transactions,
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let limit = ConfigGrpcFilters::default(); let limit = ConfigGrpcFilters::default();
let filter = Filter::new(&config, &limit).unwrap(); let filter = Filter::new(&config, &limit).unwrap();
@ -311,6 +318,7 @@ mod tests {
transactions, transactions,
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let limit = ConfigGrpcFilters::default(); let limit = ConfigGrpcFilters::default();
let filter = Filter::new(&config, &limit).unwrap(); let filter = Filter::new(&config, &limit).unwrap();
@ -357,6 +365,7 @@ mod tests {
transactions, transactions,
blocks: HashMap::new(), blocks: HashMap::new(),
blocks_meta: HashMap::new(), blocks_meta: HashMap::new(),
commitment: None,
}; };
let limit = ConfigGrpcFilters::default(); let limit = ConfigGrpcFilters::default();
let filter = Filter::new(&config, &limit).unwrap(); let filter = Filter::new(&config, &limit).unwrap();

View File

@ -14,12 +14,19 @@ service Geyser {
rpc GetSlot(GetSlotRequest) returns (GetSlotResponse) {} rpc GetSlot(GetSlotRequest) returns (GetSlotResponse) {}
} }
enum CommitmentLevel {
PROCESSED = 0;
CONFIRMED = 1;
FINALIZED = 2;
}
message SubscribeRequest { message SubscribeRequest {
map<string, SubscribeRequestFilterAccounts> accounts = 1; map<string, SubscribeRequestFilterAccounts> accounts = 1;
map<string, SubscribeRequestFilterSlots> slots = 2; map<string, SubscribeRequestFilterSlots> slots = 2;
map<string, SubscribeRequestFilterTransactions> transactions = 3; map<string, SubscribeRequestFilterTransactions> transactions = 3;
map<string, SubscribeRequestFilterBlocks> blocks = 4; map<string, SubscribeRequestFilterBlocks> blocks = 4;
map<string, SubscribeRequestFilterBlocksMeta> blocks_meta = 5; map<string, SubscribeRequestFilterBlocksMeta> blocks_meta = 5;
optional CommitmentLevel commitment = 6;
} }
message SubscribeRequestFilterAccounts { message SubscribeRequestFilterAccounts {
@ -91,13 +98,7 @@ message SubscribeUpdateAccountInfo {
message SubscribeUpdateSlot { message SubscribeUpdateSlot {
uint64 slot = 1; uint64 slot = 1;
optional uint64 parent = 2; optional uint64 parent = 2;
SubscribeUpdateSlotStatus status = 3; CommitmentLevel status = 3;
}
enum SubscribeUpdateSlotStatus {
PROCESSED = 0;
CONFIRMED = 1;
FINALIZED = 2;
} }
message SubscribeUpdateTransaction { message SubscribeUpdateTransaction {
@ -149,20 +150,29 @@ message PongResponse {
int32 count = 1; int32 count = 1;
} }
message GetLatestBlockhashRequest {} message GetLatestBlockhashRequest {
optional CommitmentLevel commitment = 1;
}
message GetLatestBlockhashResponse { message GetLatestBlockhashResponse {
// The latest blockhash // The latest blockhash
uint64 slot = 1; uint64 slot = 1;
string blockhash = 2; string blockhash = 2;
uint64 lastValidBlockHeight = 3; uint64 last_valid_block_height = 3;
}
message GetBlockHeightRequest {
optional CommitmentLevel commitment = 1;
} }
message GetBlockHeightRequest {}
message GetBlockHeightResponse { message GetBlockHeightResponse {
uint64 BlockHeight = 1; uint64 block_height = 1;
}
message GetSlotRequest {
optional CommitmentLevel commitment = 1;
} }
message GetSlotRequest {}
message GetSlotResponse { message GetSlotResponse {
uint64 slot = 1; uint64 slot = 1;
} }