From 4a0f3e510f386b50f1d83a3446990973e7b08f8e Mon Sep 17 00:00:00 2001 From: musitdev Date: Wed, 18 Oct 2023 12:58:54 +0200 Subject: [PATCH] rework stake history sync algo --- stake_aggregate/Cargo.toml | 11 ++++- stake_aggregate/bin/stakehistory.rs | 30 +++++++++++++ stake_aggregate/src/bootstrap.rs | 20 +++++++++ stake_aggregate/src/epoch.rs | 10 +++-- stake_aggregate/src/leader_schedule.rs | 61 +++++++++++++++++++------- stake_aggregate/src/main.rs | 1 + stake_aggregate/src/stakestore.rs | 1 + 7 files changed, 113 insertions(+), 21 deletions(-) create mode 100644 stake_aggregate/bin/stakehistory.rs diff --git a/stake_aggregate/Cargo.toml b/stake_aggregate/Cargo.toml index f0d9034..3eb3f0f 100644 --- a/stake_aggregate/Cargo.toml +++ b/stake_aggregate/Cargo.toml @@ -15,6 +15,10 @@ path = "bin/parse_validator_stake.rs" name = "readstakes" path = "bin/read_stake_export.rs" +[[bin]] +name = "stakehistory" +path = "bin/stakehistory.rs" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -43,8 +47,8 @@ jsonrpsee = { version = "0.20.0", features = ["macros", "server", "full"] } #jsonrpsee-types = "0.20.0" thiserror = "1.0.40" -yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" } -yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" } +#yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" } +#yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" } #yellowstone-grpc-client = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" } @@ -53,6 +57,9 @@ yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto #yellowstone-grpc-client = "1.11.0+solana.1.16.14" #yellowstone-grpc-proto = "1.10.0+solana.1.16.14" +yellowstone-grpc-client = "1.9.0" +yellowstone-grpc-proto = "1.9.0" + solana-sdk = "1.16.14" solana-client = "1.16.14" solana-ledger = "1.16.14" diff --git a/stake_aggregate/bin/stakehistory.rs b/stake_aggregate/bin/stakehistory.rs new file mode 100644 index 0000000..e9e938e --- /dev/null +++ b/stake_aggregate/bin/stakehistory.rs @@ -0,0 +1,30 @@ +use solana_client::client_error::ClientError; +use solana_client::rpc_client::RpcClient; +use solana_sdk::account::Account; +use solana_sdk::account::AccountSharedData; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::stake_history::StakeHistory; +use std::time::Duration; + +const RPC_URL: &str = "http://147.28.169.13:8899"; + +pub fn get_stakehistory_account(rpc_url: String) -> Result { + log::info!("TaskToExec RpcGetStakeHistory start"); + let rpc_client = RpcClient::new_with_timeout_and_commitment( + rpc_url, + Duration::from_secs(600), + CommitmentConfig::finalized(), + ); + let res_stake = rpc_client.get_account(&solana_sdk::sysvar::stake_history::id()); + log::info!("TaskToExec RpcGetStakeHistory END",); + res_stake +} + +fn main() { + let history = get_stakehistory_account(RPC_URL.to_string()) + .ok() + .and_then(|account| { + solana_sdk::account::from_account::(&AccountSharedData::from(account)) + }); + println!("RPC Stake_history: {:?}", history); +} diff --git a/stake_aggregate/src/bootstrap.rs b/stake_aggregate/src/bootstrap.rs index bb7968a..5b51b2f 100644 --- a/stake_aggregate/src/bootstrap.rs +++ b/stake_aggregate/src/bootstrap.rs @@ -78,6 +78,7 @@ pub struct BootstrapData { pub done: bool, pub sleep_time: u64, pub rpc_url: String, + pub current_epoch: u64, } fn process_bootstrap_event( @@ -135,6 +136,25 @@ fn process_bootstrap_event( log::error!("Bootstrap error, can't read stake history."); } + log::info!( + "Bootstrap epoch:{} stake history:{:?}", + data.current_epoch, + stake_history, + ); + + //verify that the history is up to date otherwise exit + //because schedule will be false. + if stake_history + .as_ref() + .and_then(|h| h.get(data.current_epoch - 1)) + .is_none() + { + panic!( + "Bootstrap stake history not up to date for epoch:{} exit", + data.current_epoch + ); + } + //merge new PA with stake map and vote map in a specific task let jh = tokio::task::spawn_blocking({ move || { diff --git a/stake_aggregate/src/epoch.rs b/stake_aggregate/src/epoch.rs index f5a2bce..bd03108 100644 --- a/stake_aggregate/src/epoch.rs +++ b/stake_aggregate/src/epoch.rs @@ -113,15 +113,19 @@ impl CurrentEpochSlotState { } pub async fn bootstrap(rpc_url: String) -> anyhow::Result { - let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized()); + let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); let epoch_cache = EpochCache::bootstrap_epoch(&rpc_client).await?; + let confirmed_slot = rpc_client.get_slot().await.unwrap_or(0); - Ok(CurrentEpochSlotState { + let mut state = CurrentEpochSlotState { current_slot: CurrentSlot::default(), epoch_cache, current_epoch_value: Epoch::default(), - }) + }; + state.current_slot.confirmed_slot = confirmed_slot; + state.current_epoch_value = state.epoch_cache.get_epoch_at_slot(confirmed_slot); + Ok(state) } // pub fn current_epoch_start_slot(&self) -> Slot { diff --git a/stake_aggregate/src/leader_schedule.rs b/stake_aggregate/src/leader_schedule.rs index a6e796b..74a8bb4 100644 --- a/stake_aggregate/src/leader_schedule.rs +++ b/stake_aggregate/src/leader_schedule.rs @@ -231,16 +231,16 @@ fn process_leadershedule_event( let epoch_schedule = Arc::clone(&epoch_schedule); let jh = tokio::task::spawn_blocking({ move || { - let next_epoch = current_epoch + 1; let epoch_vote_stakes = calculate_epoch_stakes( &stake_map, &vote_map, current_epoch, - next_epoch, stake_history.as_mut(), &epoch_schedule, ); + //calculate the schedule for next epoch using the current epoch start stake. + let next_epoch = current_epoch + 1; let leader_schedule = calculate_leader_schedule( &epoch_vote_stakes, next_epoch, @@ -320,24 +320,47 @@ fn process_leadershedule_event( } } +// /// get the epoch for which the given slot should save off +// /// information about stakers +// pub fn get_leader_schedule_epoch(&self, slot: Slot) -> Epoch { +// if slot < self.first_normal_slot { +// // until we get to normal slots, behave as if leader_schedule_slot_offset == slots_per_epoch +// self.get_epoch_and_slot_index(slot).0.saturating_add(1) +// } else { +// let new_slots_since_first_normal_slot = slot.saturating_sub(self.first_normal_slot); +// let new_first_normal_leader_schedule_slot = +// new_slots_since_first_normal_slot.saturating_add(self.leader_schedule_slot_offset); +// let new_epochs_since_first_normal_leader_schedule = +// new_first_normal_leader_schedule_slot +// .checked_div(self.slots_per_epoch) +// .unwrap_or(0); +// self.first_normal_epoch +// .saturating_add(new_epochs_since_first_normal_leader_schedule) +// } +// } + fn calculate_epoch_stakes( stake_map: &StakeMap, vote_map: &VoteMap, - current_epoch: u64, - next_epoch: u64, + new_epoch: u64, mut stake_history: Option<&mut StakeHistory>, epoch_schedule: &EpochSchedule, ) -> HashMap)> { //update stake history with current end epoch stake values. let new_rate_activation_epoch = FeatureSet::default().new_warmup_cooldown_rate_epoch(epoch_schedule); + + log::info!("calculate_epoch_stakes new_epoch:{new_epoch}",); + + let ended_epoch = new_epoch - 1; + //update stake history for the ended epoch. let stake_history_entry = stake_map .values() .fold(StakeActivationStatus::default(), |acc, stake_account| { let delegation = stake_account.stake; acc + delegation.stake_activating_and_deactivating( - current_epoch, + ended_epoch, stake_history.as_deref(), new_rate_activation_epoch, ) @@ -345,13 +368,14 @@ fn calculate_epoch_stakes( match stake_history { Some(ref mut stake_history) => { log::info!( - "Stake_history add epoch{current_epoch} stake history:{stake_history_entry:?}" + "Stake_history add epoch{ended_epoch} stake history:{stake_history_entry:?}" ); - stake_history.add(current_epoch, stake_history_entry) + stake_history.add(ended_epoch, stake_history_entry) } None => log::warn!("Vote stake calculus without Stake History"), }; + //Done for verification not in the algo //get current stake history match crate::bootstrap::get_stakehistory_account(crate::RPC_URL.to_string()) .ok() @@ -360,15 +384,15 @@ fn calculate_epoch_stakes( }) { Some(rpc_history) => { log::info!( - "Stake_history new epoch:{current_epoch} C:{:?} RPC:{:?}", - stake_history.as_ref().map(|h| h.get(current_epoch)), - rpc_history.get(current_epoch) + "Stake_history ended epoch:{ended_epoch} C:{:?} RPC:{:?}", + stake_history.as_ref().map(|h| h.get(ended_epoch)), + rpc_history.get(ended_epoch) ); log::info!( "Stake_history last epoch:{} C:{:?} RPC:{:?}", - current_epoch - 1, - stake_history.as_ref().map(|h| h.get(current_epoch - 1)), - rpc_history.get(current_epoch - 1) + ended_epoch - 1, + stake_history.as_ref().map(|h| h.get(ended_epoch - 1)), + rpc_history.get(ended_epoch - 1) ); } None => log::error!("can't get rpc history from RPC"), @@ -382,7 +406,7 @@ fn calculate_epoch_stakes( let delegation = stake_account.stake; let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default(); *entry += delegation.stake( - next_epoch, + new_epoch, stake_history.as_deref(), new_rate_activation_epoch, ); @@ -488,11 +512,16 @@ fn calculate_leader_schedule( epoch: u64, slots_in_epoch: u64, ) -> HashMap> { - let mut stakes: Vec<(Pubkey, u64)> = stake_vote_map + let stakes_map: HashMap = stake_vote_map .iter() .filter_map(|(_, (stake, vote_account))| { - (*stake > 0).then_some((vote_account.vote_data.node_pubkey, *stake)) + (*stake != 0u64).then_some((vote_account.vote_data.node_pubkey, *stake)) }) + .into_grouping_map() + .aggregate(|acc, _node_pubkey, stake| Some(acc.unwrap_or_default() + stake)); + let mut stakes: Vec<(Pubkey, u64)> = stakes_map + .into_iter() + .map(|(key, stake)| (key, stake)) .collect(); let mut seed = [0u8; 32]; diff --git a/stake_aggregate/src/main.rs b/stake_aggregate/src/main.rs index 5d75c00..6aef61c 100644 --- a/stake_aggregate/src/main.rs +++ b/stake_aggregate/src/main.rs @@ -247,6 +247,7 @@ async fn run_loop(mut client: GeyserGrpcClient) -> anyhow::Re done: false, sleep_time: 1, rpc_url: RPC_URL.to_string(), + current_epoch: current_epoch_state.get_current_epoch().epoch, }; let jh = tokio::spawn(async move { BootstrapEvent::InitBootstrap }); spawned_bootstrap_task.push(jh); diff --git a/stake_aggregate/src/stakestore.rs b/stake_aggregate/src/stakestore.rs index 1c8bad3..c98221b 100644 --- a/stake_aggregate/src/stakestore.rs +++ b/stake_aggregate/src/stakestore.rs @@ -262,6 +262,7 @@ pub fn merge_program_account_in_strake_map( }, ) .for_each(|(pk, delegated_stake, lamports)| { + log::info!("RPC merge {pk} stake:{delegated_stake:?}"); let stake = StoredStake { pubkey: pk, lamports,