From f784f7673d853ba10b57d5e798da767349b2bc12 Mon Sep 17 00:00:00 2001 From: musitdev Date: Tue, 19 Sep 2023 18:30:02 +0200 Subject: [PATCH] add epoch_info and slot RPC call --- stake_aggregate/Cargo.toml | 8 +- stake_aggregate/src/epoch.rs | 28 +++++-- stake_aggregate/src/main.rs | 3 +- stake_aggregate/src/rpc.rs | 143 +++++++++++++++++++++++++++++------ 4 files changed, 147 insertions(+), 35 deletions(-) diff --git a/stake_aggregate/Cargo.toml b/stake_aggregate/Cargo.toml index 7a85f10..39ceeec 100644 --- a/stake_aggregate/Cargo.toml +++ b/stake_aggregate/Cargo.toml @@ -25,10 +25,10 @@ reqwest = "0.11" serde = "1.0" serde_json = "1.0" jsonrpsee = { version = "0.20.0", features = ["macros", "server", "full"] } -jsonrpsee-core = "0.20.0" -jsonrpsee-server = "0.20.0" -jsonrpsee-proc-macros = "0.20.0" -jsonrpsee-types = "0.20.0" +#jsonrpsee-core = "0.20.0" +#jsonrpsee-server = "0.20.0" +#jsonrpsee-proc-macros = "0.20.0" +#jsonrpsee-types = "0.20.0" thiserror = "1.0.40" yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" } diff --git a/stake_aggregate/src/epoch.rs b/stake_aggregate/src/epoch.rs index 284b8fb..7d1c4d6 100644 --- a/stake_aggregate/src/epoch.rs +++ b/stake_aggregate/src/epoch.rs @@ -2,9 +2,9 @@ use crate::leader_schedule::LeaderScheduleEvent; use crate::Slot; use solana_client::client_error::ClientError; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_sdk::epoch_info::EpochInfo; -use yellowstone_grpc_proto::geyser::CommitmentLevel; +use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel; use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot; #[derive(Debug)] @@ -38,7 +38,7 @@ impl CurrentEpochSlotState { &mut self, new_slot: &SubscribeUpdateSlot, ) -> Option { - if let CommitmentLevel::Confirmed = new_slot.status() { + if let GeyserCommitmentLevel::Confirmed = new_slot.status() { //for the first update of slot correct epoch info data. if self.current_slot.confirmed_slot == 0 { let diff = new_slot.slot - self.current_epoch.absolute_slot; @@ -82,7 +82,7 @@ impl CurrentEpochSlotState { //update slot state for all commitment. self.current_slot.update_slot(&new_slot); - if let CommitmentLevel::Confirmed = new_slot.status() { + if let GeyserCommitmentLevel::Confirmed = new_slot.status() { self.manage_change_epoch() } else { None @@ -128,6 +128,14 @@ pub struct CurrentSlot { } impl CurrentSlot { + pub fn get_slot_with_commitment(&self, commitment: CommitmentConfig) -> Slot { + match commitment.commitment { + CommitmentLevel::Processed => self.processed_slot, + CommitmentLevel::Confirmed => self.confirmed_slot, + CommitmentLevel::Finalized => self.finalized_slot, + } + } + fn update_slot(&mut self, slot: &SubscribeUpdateSlot) { let updade = |commitment: &str, current_slot: &mut u64, new_slot: u64| { //verify that the slot is consecutif @@ -142,9 +150,15 @@ impl CurrentSlot { }; match slot.status() { - CommitmentLevel::Processed => updade("Processed", &mut self.processed_slot, slot.slot), - CommitmentLevel::Confirmed => updade("Confirmed", &mut self.confirmed_slot, slot.slot), - CommitmentLevel::Finalized => updade("Finalized", &mut self.finalized_slot, slot.slot), + GeyserCommitmentLevel::Processed => { + updade("Processed", &mut self.processed_slot, slot.slot) + } + GeyserCommitmentLevel::Confirmed => { + updade("Confirmed", &mut self.confirmed_slot, slot.slot) + } + GeyserCommitmentLevel::Finalized => { + updade("Finalized", &mut self.finalized_slot, slot.slot) + } } } } diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index 17841e0..2b42414 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -238,7 +238,8 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re println!("Channel error during sending bacl request status error:{err:?}"); } log::info!("RPC bootstrap account send"); - } + }, + _ => crate::rpc::server_rpc_request(req, ¤t_epoch_state), } }, diff --git a/stake_aggregate/src/rpc.rs b/stake_aggregate/src/rpc.rs index 999d69c..4e1de43 100644 --- a/stake_aggregate/src/rpc.rs +++ b/stake_aggregate/src/rpc.rs @@ -1,13 +1,16 @@ +use crate::epoch::CurrentEpochSlotState; use crate::stakestore::StakeMap; use crate::stakestore::StoredStake; use crate::Slot; -use jsonrpsee_core::error::Error as JsonRpcError; -use jsonrpsee_proc_macros::rpc; -use jsonrpsee_server::{RpcModule, Server, ServerHandle}; +use jsonrpsee::core::Error as JsonRpcError; +use jsonrpsee::proc_macros::rpc; +use jsonrpsee::server::{RpcModule, Server, ServerHandle}; +use jsonrpsee::types::error::ErrorObjectOwned as JsonRpcErrorOwned; use solana_client::rpc_config::RpcContextConfig; use solana_client::rpc_response::RpcBlockhash; use solana_client::rpc_response::RpcVoteAccountStatus; -use solana_rpc_client_api::response::Response as RpcResponse; +use solana_sdk::commitment_config::CommitmentConfig; +//use solana_rpc_client_api::response::Response as RpcResponse; use solana_sdk::epoch_info::EpochInfo; use std::collections::HashMap; use std::net::SocketAddr; @@ -16,52 +19,146 @@ use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3000"; +const SERVER_ERROR_MSG: &str = "Internal server error"; -pub type RpcResult = std::result::Result; +//internal RPC access +#[derive(Debug, Error)] +pub enum ConsensusRpcError { + #[error("Error during during json RPC request receive '{0}'")] + JsonRpcError(#[from] JsonRpcError), + + #[error("Error during channel send '{0}'")] + SendError(String), + + #[error("Error during channel receive '{0}'")] + RcvError(#[from] tokio::sync::oneshot::error::RecvError), + + #[error("Bad RPC service address '{0}'")] + AddressParseError(#[from] std::net::AddrParseError), + + #[error("Custom Error '{0}'")] + Custom(String), +} + +// Conversions for errors which occur in the context of a JSON-RPC method call. +// Crate-local error variants are converted to JSON-RPC errors which are +// then return to the caller. +impl From for JsonRpcErrorOwned { + fn from(err: ConsensusRpcError) -> Self { + match &err { + ConsensusRpcError::JsonRpcError(err_msg) => { + JsonRpcErrorOwned::owned(-32000, SERVER_ERROR_MSG, Some(err_msg.to_string())) + } + ConsensusRpcError::AddressParseError(err_msg) => { + JsonRpcErrorOwned::owned(-32001, SERVER_ERROR_MSG, Some(err_msg.to_string())) + } + ConsensusRpcError::RcvError(err_msg) => { + JsonRpcErrorOwned::owned(-32002, SERVER_ERROR_MSG, Some(err_msg.to_string())) + } + _ => todo!(), + } + } +} + +pub type Result = std::result::Result; //public RPC access #[rpc(server)] pub trait ConsensusRpc { #[method(name = "getLatestBlockhash")] - async fn get_latest_blockhash( - &self, - config: Option, - ) -> RpcResult>; + async fn get_latest_blockhash(&self, config: Option) -> Result; #[method(name = "getSlot")] - async fn get_slot(&self, config: Option) -> RpcResult>; + async fn get_slot(&self, config: Option) -> Result; #[method(name = "getEpochInfo")] - async fn get_epoch_info(&self) -> RpcResult>; + async fn get_epoch_info(&self) -> Result; #[method(name = "getLeaderSchedule")] async fn get_leader_schedule( &self, slot: Option, - ) -> RpcResult>>>>; + ) -> Result>>>; #[method(name = "getVoteAccounts")] - async fn get_vote_accounts(&self) -> RpcResult>; + async fn get_vote_accounts(&self) -> Result; } -//internal RPC access -#[derive(Debug, Error)] -pub enum RpcError { - #[error("Error during during json RPC request receive '{0}'")] - JsonRpcError(#[from] JsonRpcError), - - #[error("Bad RPC service address '{0}'")] - AddressParseError(#[from] std::net::AddrParseError), +pub struct RPCServer { + request_tx: Sender, } -//start RPC access +#[jsonrpsee::core::async_trait] +impl ConsensusRpcServer for RPCServer { + async fn get_latest_blockhash(&self, config: Option) -> Result { + todo!() + } + + async fn get_slot(&self, config: Option) -> Result { + let (tx, rx) = oneshot::channel(); + if let Err(err) = self.request_tx.send(Requests::Slot(tx, config)).await { + return Err(ConsensusRpcError::SendError(err.to_string())); + } + Ok(rx.await?.ok_or(ConsensusRpcError::Custom( + "No slot after min slot".to_string(), + ))?) + } + + async fn get_epoch_info(&self) -> Result { + let (tx, rx) = oneshot::channel(); + if let Err(err) = self.request_tx.send(Requests::EpochInfo(tx)).await { + return Err(ConsensusRpcError::SendError(err.to_string())); + } + Ok(rx.await?) + } + + async fn get_leader_schedule( + &self, + slot: Option, + ) -> Result>>> { + todo!() + } + + async fn get_vote_accounts(&self) -> Result { + todo!() + } +} pub enum Requests { SaveStakes, BootstrapAccounts(tokio::sync::oneshot::Sender<(StakeMap, Slot)>), + EpochInfo(tokio::sync::oneshot::Sender), + Slot( + tokio::sync::oneshot::Sender>, + Option, + ), } -pub(crate) async fn run_server(request_tx: Sender) -> Result { +pub fn server_rpc_request(request: Requests, current_epoch_state: &CurrentEpochSlotState) { + match request { + crate::rpc::Requests::EpochInfo(tx) => { + if let Err(err) = tx.send(current_epoch_state.current_epoch.clone()) { + log::warn!("Channel error during sending back request status error:{err:?}"); + } + } + crate::rpc::Requests::Slot(tx, config) => { + let slot = config.and_then(|conf| { + let slot = current_epoch_state.current_slot.get_slot_with_commitment( + conf.commitment.unwrap_or(CommitmentConfig::confirmed()), + ); + (slot >= conf.min_context_slot.unwrap_or(0)).then_some(slot) + }); + if let Err(err) = tx.send(slot) { + log::warn!("Channel error during sending back request status error:{err:?}"); + } + } + _ => unreachable!(), + } +} + +//start private RPC access + +pub(crate) async fn run_server(request_tx: Sender) -> Result { let server = Server::builder() .max_response_body_size(1048576000) .build(PRIVATE_RPC_ADDRESS.parse::()?)