rework stake history sync algo

This commit is contained in:
musitdev 2023-10-18 12:58:54 +02:00
parent e80b48739e
commit 4a0f3e510f
7 changed files with 113 additions and 21 deletions

View File

@ -15,6 +15,10 @@ path = "bin/parse_validator_stake.rs"
name = "readstakes" name = "readstakes"
path = "bin/read_stake_export.rs" path = "bin/read_stake_export.rs"
[[bin]]
name = "stakehistory"
path = "bin/stakehistory.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -43,8 +47,8 @@ jsonrpsee = { version = "0.20.0", features = ["macros", "server", "full"] }
#jsonrpsee-types = "0.20.0" #jsonrpsee-types = "0.20.0"
thiserror = "1.0.40" thiserror = "1.0.40"
yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" } #yellowstone-grpc-client = { path = "../../yellowstone-grpc/yellowstone-grpc-client" }
yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" } #yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto" }
#yellowstone-grpc-client = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" } #yellowstone-grpc-client = { git = "http://github.com/rpcpool/yellowstone-grpc", rev = "c89b89dfc5f03f11f45ac4a6e832386a1d94cb67" }
@ -53,6 +57,9 @@ yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto
#yellowstone-grpc-client = "1.11.0+solana.1.16.14" #yellowstone-grpc-client = "1.11.0+solana.1.16.14"
#yellowstone-grpc-proto = "1.10.0+solana.1.16.14" #yellowstone-grpc-proto = "1.10.0+solana.1.16.14"
yellowstone-grpc-client = "1.9.0"
yellowstone-grpc-proto = "1.9.0"
solana-sdk = "1.16.14" solana-sdk = "1.16.14"
solana-client = "1.16.14" solana-client = "1.16.14"
solana-ledger = "1.16.14" solana-ledger = "1.16.14"

View File

@ -0,0 +1,30 @@
use solana_client::client_error::ClientError;
use solana_client::rpc_client::RpcClient;
use solana_sdk::account::Account;
use solana_sdk::account::AccountSharedData;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::stake_history::StakeHistory;
use std::time::Duration;
const RPC_URL: &str = "http://147.28.169.13:8899";
pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
log::info!("TaskToExec RpcGetStakeHistory start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client.get_account(&solana_sdk::sysvar::stake_history::id());
log::info!("TaskToExec RpcGetStakeHistory END",);
res_stake
}
fn main() {
let history = get_stakehistory_account(RPC_URL.to_string())
.ok()
.and_then(|account| {
solana_sdk::account::from_account::<StakeHistory, _>(&AccountSharedData::from(account))
});
println!("RPC Stake_history: {:?}", history);
}

View File

@ -78,6 +78,7 @@ pub struct BootstrapData {
pub done: bool, pub done: bool,
pub sleep_time: u64, pub sleep_time: u64,
pub rpc_url: String, pub rpc_url: String,
pub current_epoch: u64,
} }
fn process_bootstrap_event( fn process_bootstrap_event(
@ -135,6 +136,25 @@ fn process_bootstrap_event(
log::error!("Bootstrap error, can't read stake history."); log::error!("Bootstrap error, can't read stake history.");
} }
log::info!(
"Bootstrap epoch:{} stake history:{:?}",
data.current_epoch,
stake_history,
);
//verify that the history is up to date otherwise exit
//because schedule will be false.
if stake_history
.as_ref()
.and_then(|h| h.get(data.current_epoch - 1))
.is_none()
{
panic!(
"Bootstrap stake history not up to date for epoch:{} exit",
data.current_epoch
);
}
//merge new PA with stake map and vote map in a specific task //merge new PA with stake map and vote map in a specific task
let jh = tokio::task::spawn_blocking({ let jh = tokio::task::spawn_blocking({
move || { move || {

View File

@ -113,15 +113,19 @@ impl CurrentEpochSlotState {
} }
pub async fn bootstrap(rpc_url: String) -> anyhow::Result<CurrentEpochSlotState> { pub async fn bootstrap(rpc_url: String) -> anyhow::Result<CurrentEpochSlotState> {
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized()); let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
let epoch_cache = EpochCache::bootstrap_epoch(&rpc_client).await?; let epoch_cache = EpochCache::bootstrap_epoch(&rpc_client).await?;
let confirmed_slot = rpc_client.get_slot().await.unwrap_or(0);
Ok(CurrentEpochSlotState { let mut state = CurrentEpochSlotState {
current_slot: CurrentSlot::default(), current_slot: CurrentSlot::default(),
epoch_cache, epoch_cache,
current_epoch_value: Epoch::default(), current_epoch_value: Epoch::default(),
}) };
state.current_slot.confirmed_slot = confirmed_slot;
state.current_epoch_value = state.epoch_cache.get_epoch_at_slot(confirmed_slot);
Ok(state)
} }
// pub fn current_epoch_start_slot(&self) -> Slot { // pub fn current_epoch_start_slot(&self) -> Slot {

View File

@ -231,16 +231,16 @@ fn process_leadershedule_event(
let epoch_schedule = Arc::clone(&epoch_schedule); let epoch_schedule = Arc::clone(&epoch_schedule);
let jh = tokio::task::spawn_blocking({ let jh = tokio::task::spawn_blocking({
move || { move || {
let next_epoch = current_epoch + 1;
let epoch_vote_stakes = calculate_epoch_stakes( let epoch_vote_stakes = calculate_epoch_stakes(
&stake_map, &stake_map,
&vote_map, &vote_map,
current_epoch, current_epoch,
next_epoch,
stake_history.as_mut(), stake_history.as_mut(),
&epoch_schedule, &epoch_schedule,
); );
//calculate the schedule for next epoch using the current epoch start stake.
let next_epoch = current_epoch + 1;
let leader_schedule = calculate_leader_schedule( let leader_schedule = calculate_leader_schedule(
&epoch_vote_stakes, &epoch_vote_stakes,
next_epoch, next_epoch,
@ -320,24 +320,47 @@ fn process_leadershedule_event(
} }
} }
// /// get the epoch for which the given slot should save off
// /// information about stakers
// pub fn get_leader_schedule_epoch(&self, slot: Slot) -> Epoch {
// if slot < self.first_normal_slot {
// // until we get to normal slots, behave as if leader_schedule_slot_offset == slots_per_epoch
// self.get_epoch_and_slot_index(slot).0.saturating_add(1)
// } else {
// let new_slots_since_first_normal_slot = slot.saturating_sub(self.first_normal_slot);
// let new_first_normal_leader_schedule_slot =
// new_slots_since_first_normal_slot.saturating_add(self.leader_schedule_slot_offset);
// let new_epochs_since_first_normal_leader_schedule =
// new_first_normal_leader_schedule_slot
// .checked_div(self.slots_per_epoch)
// .unwrap_or(0);
// self.first_normal_epoch
// .saturating_add(new_epochs_since_first_normal_leader_schedule)
// }
// }
fn calculate_epoch_stakes( fn calculate_epoch_stakes(
stake_map: &StakeMap, stake_map: &StakeMap,
vote_map: &VoteMap, vote_map: &VoteMap,
current_epoch: u64, new_epoch: u64,
next_epoch: u64,
mut stake_history: Option<&mut StakeHistory>, mut stake_history: Option<&mut StakeHistory>,
epoch_schedule: &EpochSchedule, epoch_schedule: &EpochSchedule,
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> { ) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
//update stake history with current end epoch stake values. //update stake history with current end epoch stake values.
let new_rate_activation_epoch = let new_rate_activation_epoch =
FeatureSet::default().new_warmup_cooldown_rate_epoch(epoch_schedule); FeatureSet::default().new_warmup_cooldown_rate_epoch(epoch_schedule);
log::info!("calculate_epoch_stakes new_epoch:{new_epoch}",);
let ended_epoch = new_epoch - 1;
//update stake history for the ended epoch.
let stake_history_entry = let stake_history_entry =
stake_map stake_map
.values() .values()
.fold(StakeActivationStatus::default(), |acc, stake_account| { .fold(StakeActivationStatus::default(), |acc, stake_account| {
let delegation = stake_account.stake; let delegation = stake_account.stake;
acc + delegation.stake_activating_and_deactivating( acc + delegation.stake_activating_and_deactivating(
current_epoch, ended_epoch,
stake_history.as_deref(), stake_history.as_deref(),
new_rate_activation_epoch, new_rate_activation_epoch,
) )
@ -345,13 +368,14 @@ fn calculate_epoch_stakes(
match stake_history { match stake_history {
Some(ref mut stake_history) => { Some(ref mut stake_history) => {
log::info!( log::info!(
"Stake_history add epoch{current_epoch} stake history:{stake_history_entry:?}" "Stake_history add epoch{ended_epoch} stake history:{stake_history_entry:?}"
); );
stake_history.add(current_epoch, stake_history_entry) stake_history.add(ended_epoch, stake_history_entry)
} }
None => log::warn!("Vote stake calculus without Stake History"), None => log::warn!("Vote stake calculus without Stake History"),
}; };
//Done for verification not in the algo
//get current stake history //get current stake history
match crate::bootstrap::get_stakehistory_account(crate::RPC_URL.to_string()) match crate::bootstrap::get_stakehistory_account(crate::RPC_URL.to_string())
.ok() .ok()
@ -360,15 +384,15 @@ fn calculate_epoch_stakes(
}) { }) {
Some(rpc_history) => { Some(rpc_history) => {
log::info!( log::info!(
"Stake_history new epoch:{current_epoch} C:{:?} RPC:{:?}", "Stake_history ended epoch:{ended_epoch} C:{:?} RPC:{:?}",
stake_history.as_ref().map(|h| h.get(current_epoch)), stake_history.as_ref().map(|h| h.get(ended_epoch)),
rpc_history.get(current_epoch) rpc_history.get(ended_epoch)
); );
log::info!( log::info!(
"Stake_history last epoch:{} C:{:?} RPC:{:?}", "Stake_history last epoch:{} C:{:?} RPC:{:?}",
current_epoch - 1, ended_epoch - 1,
stake_history.as_ref().map(|h| h.get(current_epoch - 1)), stake_history.as_ref().map(|h| h.get(ended_epoch - 1)),
rpc_history.get(current_epoch - 1) rpc_history.get(ended_epoch - 1)
); );
} }
None => log::error!("can't get rpc history from RPC"), None => log::error!("can't get rpc history from RPC"),
@ -382,7 +406,7 @@ fn calculate_epoch_stakes(
let delegation = stake_account.stake; let delegation = stake_account.stake;
let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default(); let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
*entry += delegation.stake( *entry += delegation.stake(
next_epoch, new_epoch,
stake_history.as_deref(), stake_history.as_deref(),
new_rate_activation_epoch, new_rate_activation_epoch,
); );
@ -488,11 +512,16 @@ fn calculate_leader_schedule(
epoch: u64, epoch: u64,
slots_in_epoch: u64, slots_in_epoch: u64,
) -> HashMap<String, Vec<usize>> { ) -> HashMap<String, Vec<usize>> {
let mut stakes: Vec<(Pubkey, u64)> = stake_vote_map let stakes_map: HashMap<Pubkey, u64> = stake_vote_map
.iter() .iter()
.filter_map(|(_, (stake, vote_account))| { .filter_map(|(_, (stake, vote_account))| {
(*stake > 0).then_some((vote_account.vote_data.node_pubkey, *stake)) (*stake != 0u64).then_some((vote_account.vote_data.node_pubkey, *stake))
}) })
.into_grouping_map()
.aggregate(|acc, _node_pubkey, stake| Some(acc.unwrap_or_default() + stake));
let mut stakes: Vec<(Pubkey, u64)> = stakes_map
.into_iter()
.map(|(key, stake)| (key, stake))
.collect(); .collect();
let mut seed = [0u8; 32]; let mut seed = [0u8; 32];

View File

@ -247,6 +247,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
done: false, done: false,
sleep_time: 1, sleep_time: 1,
rpc_url: RPC_URL.to_string(), rpc_url: RPC_URL.to_string(),
current_epoch: current_epoch_state.get_current_epoch().epoch,
}; };
let jh = tokio::spawn(async move { BootstrapEvent::InitBootstrap }); let jh = tokio::spawn(async move { BootstrapEvent::InitBootstrap });
spawned_bootstrap_task.push(jh); spawned_bootstrap_task.push(jh);

View File

@ -262,6 +262,7 @@ pub fn merge_program_account_in_strake_map(
}, },
) )
.for_each(|(pk, delegated_stake, lamports)| { .for_each(|(pk, delegated_stake, lamports)| {
log::info!("RPC merge {pk} stake:{delegated_stake:?}");
let stake = StoredStake { let stake = StoredStake {
pubkey: pk, pubkey: pk,
lamports, lamports,