add epoch_info and slot RPC call

This commit is contained in:
musitdev 2023-09-19 18:30:02 +02:00
parent 01dae17ea7
commit f784f7673d
4 changed files with 147 additions and 35 deletions

View File

@ -25,10 +25,10 @@ reqwest = "0.11"
serde = "1.0"
serde_json = "1.0"
jsonrpsee = { version = "0.20.0", features = ["macros", "server", "full"] }
jsonrpsee-core = "0.20.0"
jsonrpsee-server = "0.20.0"
jsonrpsee-proc-macros = "0.20.0"
jsonrpsee-types = "0.20.0"
#jsonrpsee-core = "0.20.0"
#jsonrpsee-server = "0.20.0"
#jsonrpsee-proc-macros = "0.20.0"
#jsonrpsee-types = "0.20.0"
thiserror = "1.0.40"
yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" }

View File

@ -2,9 +2,9 @@ use crate::leader_schedule::LeaderScheduleEvent;
use crate::Slot;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::epoch_info::EpochInfo;
use yellowstone_grpc_proto::geyser::CommitmentLevel;
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
#[derive(Debug)]
@ -38,7 +38,7 @@ impl CurrentEpochSlotState {
&mut self,
new_slot: &SubscribeUpdateSlot,
) -> Option<LeaderScheduleEvent> {
if let CommitmentLevel::Confirmed = new_slot.status() {
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
//for the first update of slot correct epoch info data.
if self.current_slot.confirmed_slot == 0 {
let diff = new_slot.slot - self.current_epoch.absolute_slot;
@ -82,7 +82,7 @@ impl CurrentEpochSlotState {
//update slot state for all commitment.
self.current_slot.update_slot(&new_slot);
if let CommitmentLevel::Confirmed = new_slot.status() {
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
self.manage_change_epoch()
} else {
None
@ -128,6 +128,14 @@ pub struct CurrentSlot {
}
impl CurrentSlot {
pub fn get_slot_with_commitment(&self, commitment: CommitmentConfig) -> Slot {
match commitment.commitment {
CommitmentLevel::Processed => self.processed_slot,
CommitmentLevel::Confirmed => self.confirmed_slot,
CommitmentLevel::Finalized => self.finalized_slot,
}
}
fn update_slot(&mut self, slot: &SubscribeUpdateSlot) {
let updade = |commitment: &str, current_slot: &mut u64, new_slot: u64| {
//verify that the slot is consecutif
@ -142,9 +150,15 @@ impl CurrentSlot {
};
match slot.status() {
CommitmentLevel::Processed => updade("Processed", &mut self.processed_slot, slot.slot),
CommitmentLevel::Confirmed => updade("Confirmed", &mut self.confirmed_slot, slot.slot),
CommitmentLevel::Finalized => updade("Finalized", &mut self.finalized_slot, slot.slot),
GeyserCommitmentLevel::Processed => {
updade("Processed", &mut self.processed_slot, slot.slot)
}
GeyserCommitmentLevel::Confirmed => {
updade("Confirmed", &mut self.confirmed_slot, slot.slot)
}
GeyserCommitmentLevel::Finalized => {
updade("Finalized", &mut self.finalized_slot, slot.slot)
}
}
}
}

View File

@ -238,7 +238,8 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
println!("Channel error during sending bacl request status error:{err:?}");
}
log::info!("RPC bootstrap account send");
}
},
_ => crate::rpc::server_rpc_request(req, &current_epoch_state),
}
},

View File

@ -1,13 +1,16 @@
use crate::epoch::CurrentEpochSlotState;
use crate::stakestore::StakeMap;
use crate::stakestore::StoredStake;
use crate::Slot;
use jsonrpsee_core::error::Error as JsonRpcError;
use jsonrpsee_proc_macros::rpc;
use jsonrpsee_server::{RpcModule, Server, ServerHandle};
use jsonrpsee::core::Error as JsonRpcError;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::{RpcModule, Server, ServerHandle};
use jsonrpsee::types::error::ErrorObjectOwned as JsonRpcErrorOwned;
use solana_client::rpc_config::RpcContextConfig;
use solana_client::rpc_response::RpcBlockhash;
use solana_client::rpc_response::RpcVoteAccountStatus;
use solana_rpc_client_api::response::Response as RpcResponse;
use solana_sdk::commitment_config::CommitmentConfig;
//use solana_rpc_client_api::response::Response as RpcResponse;
use solana_sdk::epoch_info::EpochInfo;
use std::collections::HashMap;
use std::net::SocketAddr;
@ -16,52 +19,146 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3000";
const SERVER_ERROR_MSG: &str = "Internal server error";
pub type RpcResult<T> = std::result::Result<T, jsonrpsee_types::error::ErrorCode>;
//internal RPC access
#[derive(Debug, Error)]
pub enum ConsensusRpcError {
#[error("Error during during json RPC request receive '{0}'")]
JsonRpcError(#[from] JsonRpcError),
#[error("Error during channel send '{0}'")]
SendError(String),
#[error("Error during channel receive '{0}'")]
RcvError(#[from] tokio::sync::oneshot::error::RecvError),
#[error("Bad RPC service address '{0}'")]
AddressParseError(#[from] std::net::AddrParseError),
#[error("Custom Error '{0}'")]
Custom(String),
}
// Conversions for errors which occur in the context of a JSON-RPC method call.
// Crate-local error variants are converted to JSON-RPC errors which are
// then return to the caller.
impl From<ConsensusRpcError> for JsonRpcErrorOwned {
fn from(err: ConsensusRpcError) -> Self {
match &err {
ConsensusRpcError::JsonRpcError(err_msg) => {
JsonRpcErrorOwned::owned(-32000, SERVER_ERROR_MSG, Some(err_msg.to_string()))
}
ConsensusRpcError::AddressParseError(err_msg) => {
JsonRpcErrorOwned::owned(-32001, SERVER_ERROR_MSG, Some(err_msg.to_string()))
}
ConsensusRpcError::RcvError(err_msg) => {
JsonRpcErrorOwned::owned(-32002, SERVER_ERROR_MSG, Some(err_msg.to_string()))
}
_ => todo!(),
}
}
}
pub type Result<T> = std::result::Result<T, ConsensusRpcError>;
//public RPC access
#[rpc(server)]
pub trait ConsensusRpc {
#[method(name = "getLatestBlockhash")]
async fn get_latest_blockhash(
&self,
config: Option<RpcContextConfig>,
) -> RpcResult<RpcResponse<RpcBlockhash>>;
async fn get_latest_blockhash(&self, config: Option<RpcContextConfig>) -> Result<RpcBlockhash>;
#[method(name = "getSlot")]
async fn get_slot(&self, config: Option<RpcContextConfig>) -> RpcResult<RpcResponse<Slot>>;
async fn get_slot(&self, config: Option<RpcContextConfig>) -> Result<Slot>;
#[method(name = "getEpochInfo")]
async fn get_epoch_info(&self) -> RpcResult<RpcResponse<EpochInfo>>;
async fn get_epoch_info(&self) -> Result<EpochInfo>;
#[method(name = "getLeaderSchedule")]
async fn get_leader_schedule(
&self,
slot: Option<u64>,
) -> RpcResult<RpcResponse<Option<HashMap<String, Vec<usize>>>>>;
) -> Result<Option<HashMap<String, Vec<usize>>>>;
#[method(name = "getVoteAccounts")]
async fn get_vote_accounts(&self) -> RpcResult<RpcResponse<RpcVoteAccountStatus>>;
async fn get_vote_accounts(&self) -> Result<RpcVoteAccountStatus>;
}
//internal RPC access
#[derive(Debug, Error)]
pub enum RpcError {
#[error("Error during during json RPC request receive '{0}'")]
JsonRpcError(#[from] JsonRpcError),
#[error("Bad RPC service address '{0}'")]
AddressParseError(#[from] std::net::AddrParseError),
pub struct RPCServer {
request_tx: Sender<Requests>,
}
//start RPC access
#[jsonrpsee::core::async_trait]
impl ConsensusRpcServer for RPCServer {
async fn get_latest_blockhash(&self, config: Option<RpcContextConfig>) -> Result<RpcBlockhash> {
todo!()
}
async fn get_slot(&self, config: Option<RpcContextConfig>) -> Result<Slot> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.request_tx.send(Requests::Slot(tx, config)).await {
return Err(ConsensusRpcError::SendError(err.to_string()));
}
Ok(rx.await?.ok_or(ConsensusRpcError::Custom(
"No slot after min slot".to_string(),
))?)
}
async fn get_epoch_info(&self) -> Result<EpochInfo> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.request_tx.send(Requests::EpochInfo(tx)).await {
return Err(ConsensusRpcError::SendError(err.to_string()));
}
Ok(rx.await?)
}
async fn get_leader_schedule(
&self,
slot: Option<u64>,
) -> Result<Option<HashMap<String, Vec<usize>>>> {
todo!()
}
async fn get_vote_accounts(&self) -> Result<RpcVoteAccountStatus> {
todo!()
}
}
pub enum Requests {
SaveStakes,
BootstrapAccounts(tokio::sync::oneshot::Sender<(StakeMap, Slot)>),
EpochInfo(tokio::sync::oneshot::Sender<EpochInfo>),
Slot(
tokio::sync::oneshot::Sender<Option<Slot>>,
Option<RpcContextConfig>,
),
}
pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHandle, RpcError> {
pub fn server_rpc_request(request: Requests, current_epoch_state: &CurrentEpochSlotState) {
match request {
crate::rpc::Requests::EpochInfo(tx) => {
if let Err(err) = tx.send(current_epoch_state.current_epoch.clone()) {
log::warn!("Channel error during sending back request status error:{err:?}");
}
}
crate::rpc::Requests::Slot(tx, config) => {
let slot = config.and_then(|conf| {
let slot = current_epoch_state.current_slot.get_slot_with_commitment(
conf.commitment.unwrap_or(CommitmentConfig::confirmed()),
);
(slot >= conf.min_context_slot.unwrap_or(0)).then_some(slot)
});
if let Err(err) = tx.send(slot) {
log::warn!("Channel error during sending back request status error:{err:?}");
}
}
_ => unreachable!(),
}
}
//start private RPC access
pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHandle> {
let server = Server::builder()
.max_response_body_size(1048576000)
.build(PRIVATE_RPC_ADDRESS.parse::<SocketAddr>()?)