add stake reloading at epoch change.

This commit is contained in:
musitdev 2023-09-06 13:50:36 +02:00
parent ffc9fc64dd
commit 7b60d64d6c
3 changed files with 34 additions and 23 deletions

View File

@ -6,9 +6,8 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo; use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation; use solana_sdk::stake::state::Delegation;
use std::collections::BTreeSet; use std::collections::BTreeMap;
use std::collections::HashMap; use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
const MAX_EPOCH_VALUE: u64 = 18446744073709551615; const MAX_EPOCH_VALUE: u64 = 18446744073709551615;
@ -188,7 +187,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
pub fn save_schedule_on_file<T: serde::Serialize>( pub fn save_schedule_on_file<T: serde::Serialize>(
name: &str, name: &str,
map: &HashMap<String, T>, map: &BTreeMap<String, T>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let serialized_map = serde_json::to_string(map).unwrap(); let serialized_map = serde_json::to_string(map).unwrap();
@ -219,7 +218,7 @@ pub fn build_current_stakes(
current_epoch_info: &EpochInfo, current_epoch_info: &EpochInfo,
rpc_url: String, rpc_url: String,
commitment: CommitmentConfig, commitment: CommitmentConfig,
) -> HashMap<String, (u64, u64)> { ) -> BTreeMap<String, (u64, u64)> {
// Fetch stakes in current epoch // Fetch stakes in current epoch
let rpc_client = let rpc_client =
RpcClient::new_with_timeout_and_commitment(rpc_url, Duration::from_secs(600), commitment); //CommitmentConfig::confirmed()); RpcClient::new_with_timeout_and_commitment(rpc_url, Duration::from_secs(600), commitment); //CommitmentConfig::confirmed());
@ -228,7 +227,8 @@ pub fn build_current_stakes(
.unwrap(); .unwrap();
//log::trace!("get_program_accounts:{:?}", response); //log::trace!("get_program_accounts:{:?}", response);
let mut stakes_aggregated = HashMap::<String, (u64, u64)>::new(); //use btreemap to always sort the same way.
let mut stakes_aggregated = BTreeMap::<String, (u64, u64)>::new();
for (pubkey, account) in response { for (pubkey, account) in response {
// Zero-length accounts owned by the stake program are system accounts that were re-assigned and are to be // Zero-length accounts owned by the stake program are system accounts that were re-assigned and are to be
// ignored // ignored
@ -250,19 +250,22 @@ pub fn build_current_stakes(
_ => (), _ => (),
} }
} }
stake_map.iter().for_each(|(pubkey, stake)| { stake_map
log::trace!( .iter()
"LCOAL Stake {pubkey} account:{:?} stake:{stake:?}", .filter(|(pubkey, stake)| is_stake_to_add(**pubkey, &stake.stake, current_epoch_info))
stake.stake.voter_pubkey .for_each(|(pubkey, stake)| {
); log::trace!(
(stakes_aggregated "LCOAL Stake {pubkey} account:{:?} stake:{stake:?}",
.entry(stake.stake.voter_pubkey.to_string()) stake.stake.voter_pubkey
.or_insert((0, 0))) );
.1 += stake.stake.stake; (stakes_aggregated
}); .entry(stake.stake.voter_pubkey.to_string())
.or_insert((0, 0)))
.1 += stake.stake.stake;
});
//verify the list //verify the list
let diff_list: HashMap<String, (u64, u64)> = stakes_aggregated let diff_list: BTreeMap<String, (u64, u64)> = stakes_aggregated
.iter() .iter()
.filter(|(_, (rpc, local))| rpc != local) .filter(|(_, (rpc, local))| rpc != local)
.map(|(pk, vals)| (pk.clone(), vals.clone())) .map(|(pk, vals)| (pk.clone(), vals.clone()))

View File

@ -10,6 +10,9 @@
} }
' '
*/ */
//TODO: add stake verify that it' not already desactivated.
use crate::stakestore::extract_stakestore; use crate::stakestore::extract_stakestore;
use crate::stakestore::merge_stakestore; use crate::stakestore::merge_stakestore;
use crate::stakestore::StakeStore; use crate::stakestore::StakeStore;
@ -108,7 +111,8 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}; };
let mut next_epoch_start_slot = let mut next_epoch_start_slot =
current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot; current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
log::info!("Run_loop init current_epoch:{current_epoch:?}"); let mut current_epoch_start_slot = current_epoch.absolute_slot - current_epoch.slot_index;
log::info!("Run_loop init {current_epoch_start_slot} {next_epoch_start_slot} current_epoch:{current_epoch:?}");
let mut spawned_task_toexec = FuturesUnordered::new(); let mut spawned_task_toexec = FuturesUnordered::new();
let mut spawned_task_result = FuturesUnordered::new(); let mut spawned_task_result = FuturesUnordered::new();
@ -208,7 +212,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let rpc_client = RpcClient::new_with_timeout_and_commitment(RPC_URL.to_string(), Duration::from_secs(600), CommitmentConfig::finalized()); let rpc_client = RpcClient::new_with_timeout_and_commitment(RPC_URL.to_string(), Duration::from_secs(600), CommitmentConfig::finalized());
let res = rpc_client.get_program_accounts(&solana_sdk::stake::program::id()).await; let res = rpc_client.get_program_accounts(&solana_sdk::stake::program::id()).await;
log::info!("TaskToExec RpcGetPa END"); log::info!("TaskToExec RpcGetPa END");
//let res = crate::rpc::get_program_accounts(RPC_URL, &solana_sdk::stake::program::id()).await;
TaskResult::RpcGetPa(res) TaskResult::RpcGetPa(res)
}, },
TaskToExec::RpcGetCurrentEpoch => { TaskToExec::RpcGetCurrentEpoch => {
@ -236,8 +239,9 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//merge new PA with stake map in a specific thread //merge new PA with stake map in a specific thread
log::trace!("Run_loop before Program account stake merge START"); log::trace!("Run_loop before Program account stake merge START");
let jh = tokio::task::spawn_blocking(|| { let jh = tokio::task::spawn_blocking(move || {
crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, pa_list); //update pa_list to set slot update to start epoq one.
crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, pa_list, current_epoch_start_slot);
TaskResult::MergePAList(stake_map) TaskResult::MergePAList(stake_map)
}); });
spawned_task_result.push(jh); spawned_task_result.push(jh);
@ -386,7 +390,8 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
current_slot.update_slot(&slot); current_slot.update_slot(&slot);
if current_slot.confirmed_slot >= next_epoch_start_slot-1 { //slot can be non consecutif. if current_slot.confirmed_slot >= next_epoch_start_slot-1 { //slot can be non consecutif.
log::info!("End epoch slot. Calculate schedule at current slot:{}", current_slot.confirmed_slot);
log::info!("End epoch slot, change epoch. Calculate schedule at current slot:{}", current_slot.confirmed_slot);
let Ok(stake_map) = extract_stakestore(&mut stakestore) else { let Ok(stake_map) = extract_stakestore(&mut stakestore) else {
log::info!("Epoch schedule aborted because a getPA is currently running."); log::info!("Epoch schedule aborted because a getPA is currently running.");
continue; continue;
@ -395,6 +400,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//change epoch. Change manually then update using RPC. //change epoch. Change manually then update using RPC.
current_epoch.epoch +=1; current_epoch.epoch +=1;
current_epoch.slot_index = 1; current_epoch.slot_index = 1;
current_epoch_start_slot = next_epoch_start_slot;
next_epoch_start_slot = next_epoch_start_slot + current_epoch.slots_in_epoch; //set to next epochs. next_epoch_start_slot = next_epoch_start_slot + current_epoch.slots_in_epoch; //set to next epochs.
log::info!("End slot epoch update calculated next epoch:{current_epoch:?}"); log::info!("End slot epoch update calculated next epoch:{current_epoch:?}");
@ -412,7 +418,8 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
spawned_task_result.push(jh); spawned_task_result.push(jh);
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetCurrentEpoch)); spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetCurrentEpoch));
//reload current Stake account a epoch change to synchronize.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
} }
} }

View File

@ -142,6 +142,7 @@ impl StakeStore {
pub fn merge_program_account_in_strake_map( pub fn merge_program_account_in_strake_map(
stake_map: &mut StakeMap, stake_map: &mut StakeMap,
pa_list: Vec<(Pubkey, Account)>, pa_list: Vec<(Pubkey, Account)>,
last_update_slot: Slot,
) { ) {
pa_list pa_list
.into_iter() .into_iter()
@ -158,7 +159,7 @@ pub fn merge_program_account_in_strake_map(
let stake = StoredStake { let stake = StoredStake {
pubkey: pk, pubkey: pk,
stake: delegated_stake, stake: delegated_stake,
last_update_slot: 0, last_update_slot,
write_version: 0, write_version: 0,
}; };
stake_map_insert_stake(stake_map, pk, stake); stake_map_insert_stake(stake_map, pk, stake);