diff --git a/leadershedule/src/main.rs b/leadershedule/src/main.rs index 1747c8b..6e00173 100644 --- a/leadershedule/src/main.rs +++ b/leadershedule/src/main.rs @@ -1,4 +1,4 @@ -//cargo run 10 23 15 +//cargo run 1 10 23 15 for 1d 10h 23mn 15s use borsh::BorshDeserialize; use chrono::{Datelike, Local, NaiveDate, NaiveTime, Timelike}; @@ -36,20 +36,23 @@ pub async fn main() -> anyhow::Result<()> { std::process::exit(1); } - let target_hour: u32 = args[1] + let day: u64 = args[1] .parse() .expect("First argument should be a number representing the hour"); - let target_minute: u32 = args[2] + let hour: u64 = args[2] + .parse() + .expect("First argument should be a number representing the hour"); + let minute: u64 = args[3] .parse() .expect("Second argument should be a number representing the minute"); - let target_second: u32 = args[3] + let second: u64 = args[4] .parse() .expect("Third argument should be a number representing the seconds"); - let seconds_until_target = seconds_until_target_time(target_hour, target_minute, target_second); + let seconds_until_target = day * 24 * 3600 + hour * 3600 + minute * 60 + second; log::info!("seconds_until_target:{}", seconds_until_target); - let to_wait = Duration::from_secs((seconds_until_target as u64).saturating_sub(30)); - //tokio::time::sleep(to_wait).await; + let to_wait = Duration::from_secs(seconds_until_target as u64); + tokio::time::sleep(to_wait).await; let mut counter = 0; let mut schedule_counter = 0; @@ -82,10 +85,10 @@ pub async fn main() -> anyhow::Result<()> { async fn save_map(file_name: &str, map: &BTreeMap) -> anyhow::Result<()> { let serialized_map = serde_json::to_string(map).unwrap(); // Write to the file - //let mut file = File::create(file_name).await?; - //file.write_all(serialized_map.as_bytes()).await?; - log::info!("Files: {file_name}"); - log::info!("{}", serialized_map); + let mut file = File::create(file_name).await?; + file.write_all(serialized_map.as_bytes()).await?; + //log::info!("Files: {file_name}"); + //log::info!("{}", serialized_map); Ok(()) } @@ -366,26 +369,6 @@ fn seconds_until_target_time_with_time( duration_until_target.whole_seconds() } -fn seconds_until_target_time(target_hour: u32, target_minute: u32, target_second: u32) -> u64 { - let now = Local::now(); - log::info!("now:{now:?}"); - let today = now.date_naive(); - let target_naive_time = - NaiveTime::from_hms_opt(target_hour, target_minute, target_second).unwrap(); - let mut target_time = NaiveDate::and_time(&today, target_naive_time); - - // If the target time has passed for today, calculate for next day - if target_time < now.naive_local() { - target_time = NaiveDate::and_time(&(today + chrono::Duration::days(1)), target_naive_time); - } - - log::info!("target_time:{target_time:?}"); - let duration_until_target = target_time - .signed_duration_since(now.naive_local()) - .num_seconds() as u64; - duration_until_target -} - pub async fn verify_schedule( schedule: LeaderSchedule, node_vote_account_map: &HashMap, diff --git a/stake_aggregate/Cargo.toml b/stake_aggregate/Cargo.toml index 71a465c..ceb89b5 100644 --- a/stake_aggregate/Cargo.toml +++ b/stake_aggregate/Cargo.toml @@ -19,6 +19,10 @@ tokio = { version = "1.*", features = ["full"] } reqwest = "0.11" serde = "1.0" serde_json = "1.0" +jsonrpsee = { version = "0.20.0", features = [] } +jsonrpsee-core = "0.20.0" +jsonrpsee-server = "0.20.0" +thiserror = "1.0.40" yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" } yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" } diff --git a/stake_aggregate/src/leader_schedule.rs b/stake_aggregate/src/leader_schedule.rs index ed8061f..9c8f710 100644 --- a/stake_aggregate/src/leader_schedule.rs +++ b/stake_aggregate/src/leader_schedule.rs @@ -5,8 +5,10 @@ use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::epoch_info::EpochInfo; use solana_sdk::pubkey::Pubkey; +use solana_sdk::stake::state::Delegation; use std::collections::HashMap; use std::str::FromStr; +use std::time::Duration; const MAX_EPOCH_VALUE: u64 = 18446744073709551615; @@ -18,28 +20,40 @@ pub fn calculate_leader_schedule_from_stake_map( //log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}"); for storestake in stake_map.values() { //log::info!("Program_accounts stake:{stake:#?}"); - //On test validator all stakes are attributes to an account with stake.delegation.activation_epoch == MAX_EPOCH_VALUE. - //It's considered as activated stake. - if storestake.stake.activation_epoch == MAX_EPOCH_VALUE { - log::info!("Found account with stake.delegation.activation_epoch == MAX_EPOCH_VALUE use it: {}", storestake.pubkey.to_string()); - } else { - // Ignore stake accounts activated in this epoch (or later, to include activation_epoch of - // u64::MAX which indicates no activation ever happened) - if storestake.stake.activation_epoch >= current_epoch_info.epoch { - continue; - } - // Ignore stake accounts deactivated before this epoch - if storestake.stake.deactivation_epoch < current_epoch_info.epoch { - continue; - } + if is_stake_to_add(storestake.pubkey, &storestake.stake, ¤t_epoch_info) { + // Add the stake in this stake account to the total for the delegated-to vote account + *(stakes.entry(storestake.stake.voter_pubkey).or_insert(0)) += storestake.stake.stake; } - - // Add the stake in this stake account to the total for the delegated-to vote account - *(stakes.entry(storestake.stake.voter_pubkey).or_insert(0)) += storestake.stake.stake; } calculate_leader_schedule(stakes, current_epoch_info) } +fn is_stake_to_add( + stake_pubkey: Pubkey, + stake: &Delegation, + current_epoch_info: &EpochInfo, +) -> bool { + //On test validator all stakes are attributes to an account with stake.delegation.activation_epoch == MAX_EPOCH_VALUE. + //It's considered as activated stake. + if stake.activation_epoch == MAX_EPOCH_VALUE { + log::info!( + "Found account with stake.delegation.activation_epoch == MAX_EPOCH_VALUE use it: {}", + stake_pubkey.to_string() + ); + } else { + // Ignore stake accounts activated in this epoch (or later, to include activation_epoch of + // u64::MAX which indicates no activation ever happened) + if stake.activation_epoch >= current_epoch_info.epoch { + return false; + } + // Ignore stake accounts deactivated before this epoch + if stake.deactivation_epoch < current_epoch_info.epoch { + return false; + } + } + true +} + //Copied from leader_schedule_utils.rs // Mostly cribbed from leader_schedule_utils fn calculate_leader_schedule( @@ -83,7 +97,11 @@ fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { } pub fn verify_schedule(schedule: LeaderSchedule, rpc_url: String) -> anyhow::Result<()> { - let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); + let rpc_client = RpcClient::new_with_timeout_and_commitment( + rpc_url, + Duration::from_secs(600), + CommitmentConfig::confirmed(), + ); let Some(rpc_leader_schedule) = rpc_client.get_leader_schedule(None)? else { log::info!("verify_schedule RPC return no schedule. Try later."); return Ok(()); @@ -167,7 +185,10 @@ use std::fs::File; use std::io::Write; use std::time::{SystemTime, UNIX_EPOCH}; -fn save_schedule_on_file(name: &str, map: &HashMap>) -> anyhow::Result<()> { +pub fn save_schedule_on_file( + name: &str, + map: &HashMap, +) -> anyhow::Result<()> { let serialized_map = serde_json::to_string(map).unwrap(); let start = SystemTime::now(); @@ -192,15 +213,21 @@ fn save_schedule_on_file(name: &str, map: &HashMap>) -> anyho use borsh::BorshDeserialize; use solana_sdk::stake::state::StakeState; -fn print_current_program_account(rpc_client: &RpcClient) { - let mut stakes = HashMap::::new(); +pub fn build_current_stakes( + stake_map: &crate::stakestore::StakeMap, + current_epoch_info: &EpochInfo, + rpc_url: String, + commitment: CommitmentConfig, +) -> HashMap { // Fetch stakes in current epoch + let rpc_client = + RpcClient::new_with_timeout_and_commitment(rpc_url, Duration::from_secs(600), commitment); //CommitmentConfig::confirmed()); let response = rpc_client .get_program_accounts(&solana_sdk::stake::program::id()) .unwrap(); //log::trace!("get_program_accounts:{:?}", response); - + let mut stakes_aggregated = HashMap::::new(); for (pubkey, account) in response { // Zero-length accounts owned by the stake program are system accounts that were re-assigned and are to be // ignored @@ -210,16 +237,25 @@ fn print_current_program_account(rpc_client: &RpcClient) { match StakeState::deserialize(&mut account.data.as_slice()).unwrap() { StakeState::Stake(_, stake) => { - // Add the stake in this stake account to the total for the delegated-to vote account - log::info!( - "RPC Stake {pubkey} account:{account:?} stake:{stake:?} details:{stake:?}" - ); - *(stakes - .entry(stake.delegation.voter_pubkey.clone()) - .or_insert(0)) += stake.delegation.stake; + if is_stake_to_add(pubkey, &stake.delegation, current_epoch_info) { + // Add the stake in this stake account to the total for the delegated-to vote account + log::info!( + "RPC Stake {pubkey} account:{account:?} stake:{stake:?} details:{stake:?}" + ); + (stakes_aggregated + .entry(stake.delegation.voter_pubkey.to_string()) + .or_insert((0, 0))) + .0 += stake.delegation.stake; + } } _ => (), } } - log::info!("RPC Current stakes:{stakes:?}"); + stake_map.iter().for_each(|(_, stake)| { + (stakes_aggregated + .entry(stake.stake.voter_pubkey.to_string()) + .or_insert((0, 0))) + .0 += stake.stake.stake; + }); + stakes_aggregated } diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index 9b6dec1..9380e04 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -28,7 +28,7 @@ use yellowstone_grpc_proto::{ }; mod leader_schedule; -//mod rpc; +mod rpc; mod stakestore; type Slot = u64; @@ -150,8 +150,27 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re //log current data at interval let mut log_interval = tokio::time::interval(Duration::from_millis(600000)); + //start local rpc access to execute command. + let (request_tx, mut request_rx) = tokio::sync::mpsc::channel(100); + let rpc_handle = crate::rpc::run_server(request_tx).await?; + //make it run forever + tokio::spawn(rpc_handle.stopped()); + loop { tokio::select! { + _ = request_rx.recv() => { + tokio::task::spawn_blocking({ + let current_stakes = stakestore.get_cloned_stake_map(); + let move_epoch = current_epoch.clone(); + move || { + let current_stake = crate::leader_schedule::build_current_stakes(¤t_stakes, &move_epoch, RPC_URL.to_string(), CommitmentConfig::confirmed()); + if let Err(err) = crate::leader_schedule::save_schedule_on_file("stakes", ¤t_stake) { + log::error!("Error during current stakes saving:{err}"); + } + + } + }); + }, //log interval _ = log_interval.tick() => { log::info!("Run_loop update new epoch:{current_epoch:?} current slot:{current_slot:?} next epoch start slot:{next_epoch_start_slot}"); @@ -175,7 +194,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re log::info!("TaskToExec RpcGetCurrentEpoch start"); //wait 1 sec to be sure RPC change epoch tokio::time::sleep(Duration::from_secs(1)).await; - let rpc_client = RpcClient::new_with_commitment(RPC_URL.to_string(), CommitmentConfig::finalized()); + let rpc_client = RpcClient::new_with_timeout_and_commitment(RPC_URL.to_string(), Duration::from_secs(600), CommitmentConfig::finalized()); let res = rpc_client.get_epoch_info().await; TaskResult::CurrentEpoch(res) } diff --git a/stake_aggregate/src/rpc.rs b/stake_aggregate/src/rpc.rs index d9e1097..b7743dd 100644 --- a/stake_aggregate/src/rpc.rs +++ b/stake_aggregate/src/rpc.rs @@ -1,132 +1,42 @@ -use reqwest; -use serde::Deserialize; -use serde_json::json; -use solana_rpc_client_api::{ - config::RpcProgramAccountsConfig, - response::{OptionalContext, RpcKeyedAccount}, -}; -use solana_sdk::account::Account; -use solana_sdk::commitment_config::CommitmentConfig; -use solana_sdk::pubkey::Pubkey; -use tokio::time::Duration; +use jsonrpsee_core::error::Error as JsonRpcError; +//use jsonrpsee_http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; +use jsonrpsee_server::{RpcModule, Server, ServerHandle}; +use std::net::SocketAddr; +use thiserror::Error; +use tokio::sync::mpsc::Sender; -#[derive(Debug, Deserialize)] -struct RpcResponse { - jsonrpc: String, - result: T, - id: i32, +const RPC_ADDRESS: &str = "0.0.0.0:3000"; + +#[derive(Debug, Error)] +pub enum RpcError { + #[error("Error during during json RPC request receive '{0}'")] + JsonRpcError(#[from] JsonRpcError), + + #[error("Bad RPC service address '{0}'")] + AddressParseError(#[from] std::net::AddrParseError), } -// #[derive(Debug, Deserialize)] -// struct ProgramAccount { -// pubkey: String, -// account: UiAccount, -// } +//start RPC access -pub async fn get_program_accounts( - url: &str, - program_id: &Pubkey, -) -> Result, String> { - let mut default_headers = reqwest::header::HeaderMap::new(); - default_headers.append( - reqwest::header::HeaderName::from_static("solana-client"), - reqwest::header::HeaderValue::from_str( - format!("rust/{}", solana_version::Version::default()).as_str(), - ) - .unwrap(), - ); - let client = reqwest::Client::builder() - .default_headers(default_headers) - .timeout(Duration::from_secs(600)) // 10 minutes in seconds - .build() - .map_err(|err| format!("{err}"))?; - - let mut config = RpcProgramAccountsConfig::default(); - let commitment = CommitmentConfig::confirmed(); - config.account_config.commitment = Some(commitment); - - let payload = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "getProgramAccounts", - "params": [ - program_id.to_string(), - config - ] - }); - - log::info!("{payload}"); - - // let payload = json!([pubkey.to_string(), config]) - - let resp = client - .post(url) - .json(&payload) - .send() - .await - .map_err(|err| format!("{err}"))?; - - log::info!("{resp:?}"); - log::info!("{:?}", resp.text().await); - - // let resp: RpcResponse> = - // resp.json().await.map_err(|err| format!("{err}"))?; - - // let accounts = parse_keyed_accounts(resp.result)?; - // Ok(accounts) - Ok(vec![]) +pub enum Requests { + SaveStakes, } -fn parse_keyed_accounts(accounts: Vec) -> Result, String> { - let mut pubkey_accounts: Vec<(Pubkey, Account)> = Vec::with_capacity(accounts.len()); - for RpcKeyedAccount { pubkey, account } in accounts.into_iter() { - let pubkey = pubkey.parse().map_err(|err| format!("{err}"))?; - pubkey_accounts.push(( - pubkey, - account - .decode() - .ok_or_else(|| "Parse error for accour from rpc".to_string())?, - )); - } - Ok(pubkey_accounts) +pub(crate) async fn run_server(request_tx: Sender) -> Result { + let server = Server::builder() + .build(RPC_ADDRESS.parse::()?) + .await?; + let mut module = RpcModule::new(request_tx); + + //register start Batch Tx send entry point + module.register_async_method("save_stakes", |_params, request_tx| async move { + log::trace!("RPC save_stakes"); + request_tx + .send(Requests::SaveStakes) + .await + .map(|_| "Get save_stakes status successfully".to_string()) + .unwrap_or_else(|_| "error during request execution".to_string()) + })?; + let server_handle = server.start(module); + Ok(server_handle) } - -// pub async fn send(&self, request: RpcRequest, params: Value) -> ClientResult -// where -// T: serde::de::DeserializeOwned, -// { -// assert!(params.is_array() || params.is_null()); - -// let response = self -// .sender -// .send(request, params) -// .await -// .map_err(|err| err.into_with_request(request))?; -// serde_json::from_value(response) -// .map_err(|err| ClientError::new_with_request(err.into(), request)) -// } - -// pub async fn get_program_accounts_with_config( -// &self, -// pubkey: &Pubkey, -// mut config: RpcProgramAccountsConfig, -// ) -> ClientResult> { -// let commitment = config -// .account_config -// .commitment -// .unwrap_or_else(|| self.commitment()); -// let commitment = self.maybe_map_commitment(commitment).await?; -// config.account_config.commitment = Some(commitment); -// if let Some(filters) = config.filters { -// config.filters = Some(self.maybe_map_filters(filters).await?); -// } - -// let accounts = self -// .send::>>( -// RpcRequest::GetProgramAccounts, -// json!([pubkey.to_string(), config]), -// ) -// .await? -// .parse_value(); -// parse_keyed_accounts(accounts, RpcRequest::GetProgramAccounts) -// } diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs index dee4eb1..30b4395 100644 --- a/stake_aggregate/src/stakestore.rs +++ b/stake_aggregate/src/stakestore.rs @@ -42,7 +42,7 @@ fn stake_map_insert_stake(map: &mut StakeMap, stake_account: Pubkey, stake: Stor }; } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct StoredStake { pub pubkey: Pubkey, pub stake: Delegation, @@ -70,6 +70,10 @@ impl StakeStore { self.stakes.len() } + pub fn get_cloned_stake_map(&self) -> StakeMap { + self.stakes.clone() + } + //return the contained stake map to do an external update. // During extract period (between extract and merge) added stake a stored to be processed later. //if the store is already extracted return an error.