diff --git a/stake_aggregate/src/leader_schedule.rs b/stake_aggregate/src/leader_schedule.rs index 7f5cb68..72e88a8 100644 --- a/stake_aggregate/src/leader_schedule.rs +++ b/stake_aggregate/src/leader_schedule.rs @@ -6,9 +6,8 @@ use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::epoch_info::EpochInfo; use solana_sdk::pubkey::Pubkey; use solana_sdk::stake::state::Delegation; -use std::collections::BTreeSet; +use std::collections::BTreeMap; use std::collections::HashMap; -use std::str::FromStr; use std::time::Duration; const MAX_EPOCH_VALUE: u64 = 18446744073709551615; @@ -188,7 +187,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; pub fn save_schedule_on_file( name: &str, - map: &HashMap, + map: &BTreeMap, ) -> anyhow::Result<()> { let serialized_map = serde_json::to_string(map).unwrap(); @@ -219,7 +218,7 @@ pub fn build_current_stakes( current_epoch_info: &EpochInfo, rpc_url: String, commitment: CommitmentConfig, -) -> HashMap { +) -> BTreeMap { // Fetch stakes in current epoch let rpc_client = RpcClient::new_with_timeout_and_commitment(rpc_url, Duration::from_secs(600), commitment); //CommitmentConfig::confirmed()); @@ -228,7 +227,8 @@ pub fn build_current_stakes( .unwrap(); //log::trace!("get_program_accounts:{:?}", response); - let mut stakes_aggregated = HashMap::::new(); + //use btreemap to always sort the same way. + let mut stakes_aggregated = BTreeMap::::new(); for (pubkey, account) in response { // Zero-length accounts owned by the stake program are system accounts that were re-assigned and are to be // ignored @@ -250,19 +250,22 @@ pub fn build_current_stakes( _ => (), } } - stake_map.iter().for_each(|(pubkey, stake)| { - log::trace!( - "LCOAL Stake {pubkey} account:{:?} stake:{stake:?}", - stake.stake.voter_pubkey - ); - (stakes_aggregated - .entry(stake.stake.voter_pubkey.to_string()) - .or_insert((0, 0))) - .1 += stake.stake.stake; - }); + stake_map + .iter() + .filter(|(pubkey, stake)| is_stake_to_add(**pubkey, &stake.stake, current_epoch_info)) + .for_each(|(pubkey, stake)| { + log::trace!( + "LCOAL Stake {pubkey} account:{:?} stake:{stake:?}", + stake.stake.voter_pubkey + ); + (stakes_aggregated + .entry(stake.stake.voter_pubkey.to_string()) + .or_insert((0, 0))) + .1 += stake.stake.stake; + }); //verify the list - let diff_list: HashMap = stakes_aggregated + let diff_list: BTreeMap = stakes_aggregated .iter() .filter(|(_, (rpc, local))| rpc != local) .map(|(pk, vals)| (pk.clone(), vals.clone())) diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index 6806f7c..5879180 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -10,6 +10,9 @@ } ' */ + +//TODO: add stake verify that it' not already desactivated. + use crate::stakestore::extract_stakestore; use crate::stakestore::merge_stakestore; use crate::stakestore::StakeStore; @@ -108,7 +111,8 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re }; let mut next_epoch_start_slot = current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot; - log::info!("Run_loop init current_epoch:{current_epoch:?}"); + let mut current_epoch_start_slot = current_epoch.absolute_slot - current_epoch.slot_index; + log::info!("Run_loop init {current_epoch_start_slot} {next_epoch_start_slot} current_epoch:{current_epoch:?}"); let mut spawned_task_toexec = FuturesUnordered::new(); let mut spawned_task_result = FuturesUnordered::new(); @@ -208,7 +212,6 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re let rpc_client = RpcClient::new_with_timeout_and_commitment(RPC_URL.to_string(), Duration::from_secs(600), CommitmentConfig::finalized()); let res = rpc_client.get_program_accounts(&solana_sdk::stake::program::id()).await; log::info!("TaskToExec RpcGetPa END"); - //let res = crate::rpc::get_program_accounts(RPC_URL, &solana_sdk::stake::program::id()).await; TaskResult::RpcGetPa(res) }, TaskToExec::RpcGetCurrentEpoch => { @@ -236,8 +239,9 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re //merge new PA with stake map in a specific thread log::trace!("Run_loop before Program account stake merge START"); - let jh = tokio::task::spawn_blocking(|| { - crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, pa_list); + let jh = tokio::task::spawn_blocking(move || { + //update pa_list to set slot update to start epoq one. + crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, pa_list, current_epoch_start_slot); TaskResult::MergePAList(stake_map) }); spawned_task_result.push(jh); @@ -386,7 +390,8 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re current_slot.update_slot(&slot); if current_slot.confirmed_slot >= next_epoch_start_slot-1 { //slot can be non consecutif. - log::info!("End epoch slot. Calculate schedule at current slot:{}", current_slot.confirmed_slot); + + log::info!("End epoch slot, change epoch. Calculate schedule at current slot:{}", current_slot.confirmed_slot); let Ok(stake_map) = extract_stakestore(&mut stakestore) else { log::info!("Epoch schedule aborted because a getPA is currently running."); continue; @@ -395,6 +400,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re //change epoch. Change manually then update using RPC. current_epoch.epoch +=1; current_epoch.slot_index = 1; + current_epoch_start_slot = next_epoch_start_slot; next_epoch_start_slot = next_epoch_start_slot + current_epoch.slots_in_epoch; //set to next epochs. log::info!("End slot epoch update calculated next epoch:{current_epoch:?}"); @@ -412,7 +418,8 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re spawned_task_result.push(jh); spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetCurrentEpoch)); - + //reload current Stake account a epoch change to synchronize. + spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa)); } } diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs index 30b4395..d40a9b1 100644 --- a/stake_aggregate/src/stakestore.rs +++ b/stake_aggregate/src/stakestore.rs @@ -142,6 +142,7 @@ impl StakeStore { pub fn merge_program_account_in_strake_map( stake_map: &mut StakeMap, pa_list: Vec<(Pubkey, Account)>, + last_update_slot: Slot, ) { pa_list .into_iter() @@ -158,7 +159,7 @@ pub fn merge_program_account_in_strake_map( let stake = StoredStake { pubkey: pk, stake: delegated_stake, - last_update_slot: 0, + last_update_slot, write_version: 0, }; stake_map_insert_stake(stake_map, pk, stake);