114 the grpc framework supports non streaming methods getlatestblockhash (#117)

* feat: unary Ping method for geyser_server

* fix: variables can be used directly in the `format!` string

* feat: Geyser server getLatestBlockhash method

* fix: use RwLock from tokio

by fanatid:
I think it's better to use RwLock from tokio, everything what we have is async

* feat: #112 getBlockHeight & getSlot

* use RwLock and mpsc (#118)

---------

Co-authored-by: Kirill Fomichev <fanatid@ya.ru>
This commit is contained in:
cairo 2023-05-10 09:52:26 +08:00 committed by GitHub
parent 4cbb353be3
commit 61f4d436e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 10 deletions

View File

@ -247,9 +247,9 @@ where
Vec::<&str>::deserialize(deserializer)?
.into_iter()
.map(|value| {
value.parse().map_err(|error| {
de::Error::custom(format!("Invalid pubkey: {} ({:?})", value, error))
})
value
.parse()
.map_err(|error| de::Error::custom(format!("Invalid pubkey: {value} ({error:?})")))
})
.collect::<Result<_, _>>()
}

View File

@ -12,6 +12,10 @@ use {
SubscribeUpdateSlot, SubscribeUpdateSlotStatus, SubscribeUpdateTransaction,
SubscribeUpdateTransactionInfo,
},
proto::{
GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest,
GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, PingRequest, PongResponse,
},
},
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{
@ -25,10 +29,11 @@ use {
std::{
collections::HashMap,
sync::atomic::{AtomicUsize, Ordering},
sync::Arc,
time::Duration,
},
tokio::{
sync::{mpsc, oneshot},
sync::{mpsc, oneshot, RwLock},
time::sleep,
},
tokio_stream::wrappers::ReceiverStream,
@ -282,11 +287,13 @@ pub struct GrpcService {
config: ConfigGrpc,
subscribe_id: AtomicUsize,
new_clients_tx: mpsc::UnboundedSender<ClientMessage>,
latest_block_meta: Arc<RwLock<Option<MessageBlockMeta>>>,
}
impl GrpcService {
pub fn create(
config: ConfigGrpc,
latest_block_meta: Arc<RwLock<Option<MessageBlockMeta>>>,
) -> Result<
(mpsc::UnboundedSender<Message>, oneshot::Sender<()>),
Box<dyn std::error::Error + Send + Sync>,
@ -304,6 +311,7 @@ impl GrpcService {
config,
subscribe_id: AtomicUsize::new(0),
new_clients_tx,
latest_block_meta,
})
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip);
@ -458,8 +466,7 @@ impl Geyser for GrpcService {
} {
let _ = stream_tx
.send(Err(Status::invalid_argument(format!(
"failed to create filter: {}",
error
"failed to create filter: {error}"
))))
.await;
}
@ -472,4 +479,51 @@ impl Geyser for GrpcService {
Ok(Response::new(ReceiverStream::new(stream_rx)))
}
async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PongResponse>, Status> {
info!("Got a request from {:?}", request.remote_addr());
let count = request.get_ref().count;
let response = PongResponse { count: count + 1 };
Ok(Response::new(response))
}
async fn get_latest_blockhash(
&self,
_request: Request<GetLatestBlockhashRequest>,
) -> Result<Response<GetLatestBlockhashResponse>, Status> {
match self.latest_block_meta.read().await.as_ref() {
Some(block_meta) => Ok(Response::new(GetLatestBlockhashResponse {
slot: block_meta.slot,
blockhash: block_meta.blockhash.clone(),
last_valid_block_height: block_meta.block_height.unwrap(),
})),
None => Err(Status::internal("block_meta is not available yet")),
}
}
async fn get_block_height(
&self,
_request: Request<GetBlockHeightRequest>,
) -> Result<Response<GetBlockHeightResponse>, Status> {
match self.latest_block_meta.read().await.as_ref() {
Some(block_meta) => Ok(Response::new(GetBlockHeightResponse {
block_height: block_meta.block_height.unwrap(),
})),
None => Err(Status::internal("block_meta is not available yet")),
}
}
async fn get_slot(
&self,
_request: Request<GetSlotRequest>,
) -> Result<Response<GetSlotResponse>, Status> {
match self.latest_block_meta.read().await.as_ref() {
Some(block_meta) => Ok(Response::new(GetSlotResponse {
slot: block_meta.slot,
})),
None => Err(Status::internal("block_meta is not available yet")),
}
}
}

View File

@ -11,10 +11,10 @@ use {
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
},
std::{collections::BTreeMap, time::Duration},
std::{collections::BTreeMap, sync::Arc, time::Duration},
tokio::{
runtime::Runtime,
sync::{mpsc, oneshot},
sync::{mpsc, oneshot, RwLock},
},
};
@ -27,6 +27,7 @@ pub struct PluginInner {
grpc_shutdown_tx: oneshot::Sender<()>,
prometheus: PrometheusService,
transactions: BTreeMap<u64, (Option<MessageBlockMeta>, Vec<MessageTransactionInfo>)>,
latest_block_meta_tx: mpsc::UnboundedSender<MessageBlockMeta>,
}
impl PluginInner {
@ -76,9 +77,22 @@ impl GeyserPlugin for Plugin {
// Create inner
let runtime = Runtime::new().map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let (latest_block_meta_tx, mut latest_block_meta_rx) = mpsc::unbounded_channel();
let latest_block_meta = Arc::new(RwLock::new(None));
let latest_block_meta2 = latest_block_meta.clone();
runtime.spawn(async move {
while let Some(block_meta) = latest_block_meta_rx.recv().await {
let mut locked = latest_block_meta2.write().await;
*locked = Some(block_meta);
}
});
let (grpc_channel, grpc_shutdown_tx, prometheus) = runtime.block_on(async move {
let (grpc_channel, grpc_shutdown_tx) = GrpcService::create(config.grpc)
.map_err(|error| GeyserPluginError::Custom(error))?;
let (grpc_channel, grpc_shutdown_tx) =
GrpcService::create(config.grpc, latest_block_meta)
.map_err(|error| GeyserPluginError::Custom(error))?;
let prometheus = PrometheusService::new(config.prometheus)
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
Ok::<_, GeyserPluginError>((grpc_channel, grpc_shutdown_tx, prometheus))
@ -92,6 +106,7 @@ impl GeyserPlugin for Plugin {
grpc_shutdown_tx,
prometheus,
transactions: BTreeMap::new(),
latest_block_meta_tx,
});
Ok(())
@ -220,6 +235,9 @@ impl GeyserPlugin for Plugin {
inner.transactions.entry(block_meta.slot).or_default().0 = Some(block_meta.clone());
inner.try_send_full_block(block_meta.slot);
// Save newest block meta
let _ = inner.latest_block_meta_tx.send(block_meta.clone());
let message = Message::BlockMeta(block_meta);
let _ = inner.grpc_channel.send(message);

View File

@ -8,6 +8,10 @@ package geyser;
service Geyser {
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeUpdate) {}
rpc Ping(PingRequest) returns (PongResponse) {}
rpc GetLatestBlockhash(GetLatestBlockhashRequest) returns (GetLatestBlockhashResponse) {}
rpc GetBlockHeight(GetBlockHeightRequest) returns (GetBlockHeightResponse) {}
rpc GetSlot(GetSlotRequest) returns (GetSlotResponse) {}
}
message SubscribeRequest {
@ -131,3 +135,34 @@ message SubscribeUpdateBlockMeta {
}
message SubscribeUpdatePing {}
// non-streaming methods
// for testing purposes
// Ping is used to check if the server is up and to measure the response time.
message PingRequest {
int32 count = 1;
}
message PongResponse {
int32 count = 1;
}
message GetLatestBlockhashRequest {}
message GetLatestBlockhashResponse {
// The latest blockhash
uint64 slot = 1;
string blockhash = 2;
uint64 lastValidBlockHeight = 3;
}
message GetBlockHeightRequest {}
message GetBlockHeightResponse {
uint64 BlockHeight = 1;
}
message GetSlotRequest {}
message GetSlotResponse {
uint64 slot = 1;
}