Compare commits

...

2 Commits

Author SHA1 Message Date
musitdev c8879a72c9 add some test log to verify stake history at startup 2023-09-28 18:50:44 +02:00
musitdev c63da03e56 add stake history to stake calculus 2023-09-28 18:17:19 +02:00
6 changed files with 183 additions and 133 deletions

View File

@ -50,8 +50,10 @@ yellowstone-grpc-proto = { path = "../../yellowstone-grpc/yellowstone-grpc-proto
#yellowstone-grpc-client = "1.8.0+solana.1.16.1"
#yellowstone-grpc-proto = "1.8.0+solana.1.16.1"
solana-sdk = "1.16.*"
solana-client = "1.16.*"
solana-ledger = "1.16.*"
solana-rpc-client-api = "1.16.*"
solana-version = "1.16.*"
solana-sdk = "1.16.14"
solana-client = "1.16.14"
solana-ledger = "1.16.14"
solana-rpc-client-api = "1.16.14"
solana-version = "1.16.14"
solana-account-decoder = "1.16.14"
solana-program = "1.16.14"

View File

@ -57,17 +57,13 @@ pub fn run_bootstrap_events(
pub enum BootstrapEvent {
InitBootstrap,
BootstrapAccountsFetched(
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
),
BootstrapAccountsFetched(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>, Account),
StoreExtracted(
StakeMap,
VoteMap,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Account,
),
AccountsMerged(StakeMap, VoteMap),
Exit,
@ -135,6 +131,18 @@ fn process_bootstrap_event(
}
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes, history) => {
log::info!("BootstrapEvent::StoreExtracted RECV");
match crate::stakestore::read_historystake_from_account(history) {
Some(stake_history) => {
log::info!(
"Read stake history done with history len:{}",
stake_history.len()
);
stakestore.set_stake_history(stake_history);
}
None => log::error!("Bootstrap error, can't read stake history."),
}
//merge new PA with stake map and vote map in a specific task
let jh = tokio::task::spawn_blocking({
let current_epoch = data.current_epoch;
@ -143,7 +151,6 @@ fn process_bootstrap_event(
crate::stakestore::merge_program_account_in_strake_map(
&mut stake_map,
stakes,
history,
0, //with RPC no way to know the slot of the account update. Set to 0.
current_epoch,
);
@ -152,6 +159,7 @@ fn process_bootstrap_event(
votes,
0, //with RPC no way to know the slot of the account update. Set to 0.
);
BootstrapEvent::AccountsMerged(stake_map, vote_map)
}
});
@ -177,14 +185,7 @@ fn process_bootstrap_event(
async fn bootstrap_accounts(
rpc_url: String,
) -> Result<
(
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
),
ClientError,
> {
) -> Result<(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>, Account), ClientError> {
let furure = get_stake_account(rpc_url)
.and_then(|(stakes, rpc_url)| async move {
get_vote_account(rpc_url)
@ -231,7 +232,7 @@ async fn get_vote_account(
res_vote.map(|votes| (votes, rpc_url))
}
async fn get_stakehistory_account(rpc_url: String) -> Result<Vec<(Pubkey, Account)>, ClientError> {
async fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
log::info!("TaskToExec RpcGetStakeHistory start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
@ -239,11 +240,8 @@ async fn get_stakehistory_account(rpc_url: String) -> Result<Vec<(Pubkey, Accoun
CommitmentConfig::finalized(),
);
let res_stake = rpc_client
.get_program_accounts(&solana_sdk::sysvar::stake_history::id())
.get_account(&solana_sdk::sysvar::stake_history::id())
.await;
log::info!(
"TaskToExec RpcGetStakeHistory END with len:{:?}",
res_stake.as_ref().map(|history| history.len())
);
log::info!("TaskToExec RpcGetStakeHistory END",);
res_stake
}

View File

@ -1,7 +1,9 @@
use crate::leader_schedule::LeaderScheduleEvent;
use crate::Slot;
use solana_account_decoder::parse_sysvar::SysvarAccountType;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_program::epoch_schedule::EpochSchedule;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::epoch_info::EpochInfo;
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
@ -30,6 +32,27 @@ pub struct CurrentEpochSlotState {
impl CurrentEpochSlotState {
pub async fn bootstrap(rpc_url: String) -> Result<CurrentEpochSlotState, ClientError> {
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized());
//get reduce_stake_warmup_cooldown feature info.
//NOT AND ACCOUNT. Get from config.
// let reduce_stake_warmup_cooldown_epoch = rpc_client
// .get_account(&feature_set::reduce_stake_warmup_cooldown::id())
// .await?;
// let reduce_stake_warmup_cooldown_epoch = bincode::deserialize(&reduce_stake_warmup_cooldown_epoch.data[..])
// .ok()
// .map(SysvarAccountType::EpochSchedule);
// log::info!("reduce_stake_warmup_cooldown_epoch {reduce_stake_warmup_cooldown_epoch:?}");
//get epoch sysvar account to init epoch. More compatible with a snapshot bootstrap.
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;
let sysvar_epoch_schedule = bincode::deserialize(&res_epoch.data[..])
.ok()
.map(SysvarAccountType::EpochSchedule);
log::info!("sysvar_epoch_schedule {sysvar_epoch_schedule:?}");
// Fetch current epoch
let current_epoch = rpc_client.get_epoch_info().await?;
let next_epoch_start_slot =

View File

@ -10,7 +10,7 @@ use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake_history::StakeHistory;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fs::File;
@ -166,7 +166,7 @@ pub fn run_leader_schedule_events(
pub enum LeaderScheduleEvent {
InitLeaderschedule(EpochInfo),
CalculateScedule(StakeMap, VoteMap, EpochInfo),
CalculateScedule(StakeMap, VoteMap, EpochInfo, Option<StakeHistory>),
MergeStoreAndSaveSchedule(
StakeMap,
VoteMap,
@ -193,10 +193,17 @@ fn process_leadershedule_event(
match event {
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch) => {
log::info!("LeaderScheduleEvent::InitLeaderschedule RECV");
//For test TODO put in extract and restore process to avoid to clone.
let stake_history = stakestore.get_cloned_stake_history();
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => LeaderScheduleResult::Event(
LeaderScheduleEvent::CalculateScedule(stake_map, vote_map, schedule_epoch),
),
(Ok(stake_map), Ok(vote_map)) => {
LeaderScheduleResult::Event(LeaderScheduleEvent::CalculateScedule(
stake_map,
vote_map,
schedule_epoch,
stake_history,
))
}
_ => {
log::warn!("process_leadershedule_event error during extract store");
let jh = tokio::spawn(async move {
@ -207,7 +214,12 @@ fn process_leadershedule_event(
}
}
}
LeaderScheduleEvent::CalculateScedule(stake_map, vote_map, schedule_epoch) => {
LeaderScheduleEvent::CalculateScedule(
stake_map,
vote_map,
schedule_epoch,
stake_history,
) => {
log::info!("LeaderScheduleEvent::CalculateScedule RECV");
let jh = tokio::task::spawn_blocking({
move || {
@ -216,6 +228,7 @@ fn process_leadershedule_event(
&stake_map,
&vote_map,
&schedule_epoch,
stake_history.as_ref(),
);
if let Ok((_, vote_stakes)) = &schedule_and_stakes {
if let Err(err) = save_schedule_vote_stakes(
@ -269,12 +282,14 @@ fn calculate_leader_schedule_from_stake_map(
stake_map: &crate::stakestore::StakeMap,
vote_map: &crate::votestore::VoteMap,
current_epoch_info: &EpochInfo,
stake_history: Option<&StakeHistory>,
) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
let mut stakes = HashMap::<Pubkey, u64>::new();
log::trace!(
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{}",
log::info!(
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
vote_map.len(),
stake_map.len()
stake_map.len(),
stake_history.as_ref().map(|h| h.len())
);
let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch;
@ -282,35 +297,41 @@ fn calculate_leader_schedule_from_stake_map(
//log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
for storestake in stake_map.values() {
//log::info!("Program_accounts stake:{stake:#?}");
if is_stake_to_add(storestake.pubkey, &storestake.stake, &current_epoch_info) {
// Add the stake in this stake account to the total for the delegated-to vote account
//get nodeid for vote account
let Some(vote_account) = vote_map.get(&storestake.stake.voter_pubkey) else {
log::warn!(
"Vote account not found in vote map for stake vote account:{}",
&storestake.stake.voter_pubkey
);
continue;
};
//remove vote account that hasn't vote since 10 epoch.
//on testnet the vote account CY7gjryUPV6Pwbsn4aArkMBL7HSaRHB8sPZUvhw558Tm node_id:6YpwLjgXcMWAj29govWQr87kaAGKS7CnoqWsEDJE4h8P
//hasn't vote since a long time but still return on RPC call get_voteaccounts.
//the validator don't use it for leader schedule.
if vote_account.vote_data.root_slot.unwrap_or(0)
< current_epoch_info
.absolute_slot
.saturating_sub(ten_epoch_slot_long)
{
log::warn!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Remove leader_schedule."
// if is_stake_to_add(storestake.pubkey, &storestake.stake, &current_epoch_info) {
// Add the stake in this stake account to the total for the delegated-to vote account
//get nodeid for vote account
let Some(vote_account) = vote_map.get(&storestake.stake.voter_pubkey) else {
log::warn!(
"Vote account not found in vote map for stake vote account:{}",
&storestake.stake.voter_pubkey
);
continue;
};
//TODO validate the number of epoch.
//remove vote account that hasn't vote since 10 epoch.
//on testnet the vote account CY7gjryUPV6Pwbsn4aArkMBL7HSaRHB8sPZUvhw558Tm node_id:6YpwLjgXcMWAj29govWQr87kaAGKS7CnoqWsEDJE4h8P
//hasn't vote since a long time but still return on RPC call get_voteaccounts.
//the validator don't use it for leader schedule.
if vote_account.vote_data.root_slot.unwrap_or(0)
< current_epoch_info
.absolute_slot
.saturating_sub(ten_epoch_slot_long)
{
log::warn!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
, storestake.stake.voter_pubkey
,vote_account.vote_data.node_pubkey
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
, storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)),
);
} else {
*(stakes
.entry(vote_account.vote_data.node_pubkey)
.or_insert(0)) += storestake.stake.stake;
}
} else {
*(stakes
.entry(vote_account.vote_data.node_pubkey)
.or_insert(0)) += storestake
.stake
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
.stake(current_epoch_info.epoch, stake_history, Some(0));
}
// }
}
let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
@ -324,31 +345,31 @@ fn calculate_leader_schedule_from_stake_map(
Ok((leader_schedule, schedule_stakes))
}
fn is_stake_to_add(
stake_pubkey: Pubkey,
stake: &Delegation,
current_epoch_info: &EpochInfo,
) -> bool {
//On test validator all stakes are attributes to an account with stake.delegation.activation_epoch == MAX_EPOCH_VALUE.
//It's considered as activated stake.
if stake.activation_epoch == MAX_EPOCH_VALUE {
log::info!(
"Found account with stake.delegation.activation_epoch == MAX_EPOCH_VALUE use it: {}",
stake_pubkey.to_string()
);
} else {
// Ignore stake accounts activated in this epoch (or later, to include activation_epoch of
// u64::MAX which indicates no activation ever happened)
if stake.activation_epoch >= current_epoch_info.epoch {
return false;
}
// Ignore stake accounts deactivated before this epoch
if stake.deactivation_epoch < current_epoch_info.epoch {
return false;
}
}
true
}
// fn is_stake_to_add(
// stake_pubkey: Pubkey,
// stake: &Delegation,
// current_epoch_info: &EpochInfo,
// ) -> bool {
// //On test validator all stakes are attributes to an account with stake.delegation.activation_epoch == MAX_EPOCH_VALUE.
// //It's considered as activated stake.
// if stake.activation_epoch == MAX_EPOCH_VALUE {
// log::info!(
// "Found account with stake.delegation.activation_epoch == MAX_EPOCH_VALUE use it: {}",
// stake_pubkey.to_string()
// );
// } else {
// // Ignore stake accounts activated in this epoch (or later, to include activation_epoch of
// // u64::MAX which indicates no activation ever happened)
// if stake.activation_epoch >= current_epoch_info.epoch {
// return false;
// }
// // Ignore stake accounts deactivated before this epoch
// if stake.deactivation_epoch < current_epoch_info.epoch {
// return false;
// }
// }
// true
// }
//Copied from leader_schedule_utils.rs
// Mostly cribbed from leader_schedule_utils
@ -493,6 +514,7 @@ use borsh::BorshDeserialize;
use solana_sdk::stake::state::StakeState;
pub fn build_current_stakes(
stake_map: &crate::stakestore::StakeMap,
stake_history: Option<&StakeHistory>,
current_epoch_info: &EpochInfo,
rpc_url: String,
commitment: CommitmentConfig,
@ -518,7 +540,11 @@ pub fn build_current_stakes(
match BorshDeserialize::deserialize(&mut account.data.as_slice()).unwrap() {
StakeState::Stake(_, stake) => {
//vote account version
if is_stake_to_add(pubkey, &stake.delegation, current_epoch_info) {
let effective_stake =
stake
.delegation
.stake(current_epoch_info.epoch, stake_history, Some(0));
if effective_stake > 0 {
// Add the stake in this stake account to the total for the delegated-to vote account
log::trace!("RPC Stake {pubkey} account:{account:?} stake:{stake:?}");
(stakes_aggregated
@ -530,7 +556,7 @@ pub fn build_current_stakes(
.or_insert(vec![]);
st_list.push((
pubkey.to_string(),
stake.delegation.stake,
effective_stake,
stake.delegation.activation_epoch,
stake.delegation.deactivation_epoch,
));
@ -542,8 +568,14 @@ pub fn build_current_stakes(
let mut local_stakes = BTreeMap::<String, Vec<(String, u64, u64, u64)>>::new();
stake_map
.iter()
.filter(|(pubkey, stake)| is_stake_to_add(**pubkey, &stake.stake, current_epoch_info))
.for_each(|(pubkey, stake)| {
.filter_map(|(pubkey, stake)| {
let effective_stake =
stake
.stake
.stake(current_epoch_info.epoch, stake_history, Some(0));
(effective_stake > 0).then(|| (pubkey, stake, effective_stake))
})
.for_each(|(pubkey, stake, effective_stake)| {
// log::trace!(
// "LCOAL Stake {pubkey} account:{:?} stake:{stake:?}",
// stake.stake.voter_pubkey
@ -557,7 +589,7 @@ pub fn build_current_stakes(
.or_insert(vec![]);
st_list.push((
pubkey.to_string(),
stake.stake.stake,
effective_stake,
stake.stake.activation_epoch,
stake.stake.deactivation_epoch,
));

View File

@ -53,13 +53,9 @@ use crate::votestore::VoteStore;
use anyhow::bail;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use solana_sdk::account::Account;
use solana_sdk::account::AccountSharedData;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake_history::StakeHistory;
use solana_sdk::sysvar::rent::Rent;
use solana_sdk::vote::state::VoteState;
use std::collections::HashMap;
use std::env;
@ -282,10 +278,12 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
tokio::task::spawn_blocking({
log::info!("RPC start save_stakes");
let current_stakes = stakestore.get_cloned_stake_map();
let history = stakestore.get_cloned_stake_history();
let move_epoch = current_epoch_state.current_epoch.clone();
move || {
let current_stake = crate::leader_schedule::build_current_stakes(
&current_stakes,
history.as_ref(),
&move_epoch,
RPC_URL.to_string(),
CommitmentConfig::confirmed(),
@ -554,16 +552,6 @@ impl AccountPretty {
}
Ok(VoteState::deserialize(&self.data)?)
}
fn read_stake_history(&self) -> Option<StakeHistory> {
// if self.data.is_empty() {
// log::warn!("Stake history account with empty data. Can't read StakeHistory.");
// bail!("Error: read StakeHistory account with empty data");
// }
solana_sdk::account::from_account::<StakeHistory, _>(&program_account(&self.data))
// Ok(StakeHistory::deserialize(&self.data)?)
}
}
impl std::fmt::Display for AccountPretty {
@ -608,12 +596,12 @@ fn read_account(
})
}
fn program_account(program_data: &[u8]) -> AccountSharedData {
AccountSharedData::from(Account {
lamports: Rent::default().minimum_balance(program_data.len()).min(1),
data: program_data.to_vec(),
owner: solana_sdk::bpf_loader::id(),
executable: true,
rent_epoch: 0,
})
}
// fn program_account(program_data: &[u8]) -> AccountSharedData {
// AccountSharedData::from(Account {
// lamports: Rent::default().minimum_balance(program_data.len()).min(1),
// data: program_data.to_vec(),
// owner: solana_sdk::bpf_loader::id(),
// executable: true,
// rent_epoch: 0,
// })
// }

View File

@ -12,6 +12,7 @@ use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState;
use solana_sdk::stake_history::StakeHistory;
use std::collections::HashMap;
use std::str::FromStr;
use tokio::sync::mpsc::Sender;
use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction;
@ -127,12 +128,11 @@ impl StoredStake {
self.stake.activation_epoch == crate::leader_schedule::MAX_EPOCH_VALUE
|| self.stake.deactivation_epoch >= current_epoch
}
fn add_history(&mut self) {}
}
#[derive(Debug, Default)]
pub struct StakeStore {
stakes: StakeMap,
stake_history: Option<StakeHistory>,
updates: Vec<ExtractedAction>,
extracted: bool,
}
@ -141,11 +141,16 @@ impl StakeStore {
pub fn new(capacity: usize) -> Self {
StakeStore {
stakes: HashMap::with_capacity(capacity),
stake_history: None,
updates: vec![],
extracted: false,
}
}
pub fn set_stake_history(&mut self, stake_history: StakeHistory) {
self.stake_history = Some(stake_history);
}
pub fn nb_stake_account(&self) -> usize {
self.stakes.len()
}
@ -153,6 +158,9 @@ impl StakeStore {
pub fn get_cloned_stake_map(&self) -> StakeMap {
self.stakes.clone()
}
pub fn get_cloned_stake_history(&self) -> Option<StakeHistory> {
self.stake_history.clone()
}
pub fn notify_stake_change(
&mut self,
@ -238,6 +246,7 @@ impl StakeStore {
}
let stakestore = StakeStore {
stakes: HashMap::new(),
stake_history: self.stake_history,
updates: self.updates,
extracted: true,
};
@ -251,6 +260,7 @@ impl StakeStore {
}
let mut stakestore = StakeStore {
stakes,
stake_history: self.stake_history,
updates: vec![],
extracted: false,
};
@ -262,6 +272,21 @@ impl StakeStore {
action.update_epoch(current_epoch);
stakestore.process_stake_action(action);
}
//verify one stake account to test. TODO remove
let stake_account = stakestore
.stakes
.get(&Pubkey::from_str("2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce").unwrap());
let stake = stake_account.map(|stake| {
stake
.stake
.stake(current_epoch, stakestore.stake_history.as_ref(), Some(0))
});
log::info!(
"merge_stakes 2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce:{:?}",
stake
);
Ok(stakestore)
}
@ -281,12 +306,9 @@ impl StakeStore {
pub fn merge_program_account_in_strake_map(
stake_map: &mut StakeMap,
stakes_list: Vec<(Pubkey, Account)>,
stakehistory_list: Vec<(Pubkey, Account)>,
last_update_slot: Slot,
current_epoch: u64,
) {
let mut stake_history_map: HashMap<Pubkey, Account> = stakehistory_list.into_iter().collect();
stakes_list
.into_iter()
.filter_map(
@ -306,24 +328,9 @@ pub fn merge_program_account_in_strake_map(
last_update_slot,
write_version: 0,
};
if let Some(history) = stake_history_map.remove(&stake.pubkey) {
log::info!(
"merge_program_account_in_strake_map found stake history for account:{}",
stake.pubkey
);
match read_historystake_from_account(history) {
Some(history) => (),
None => (),
}
}
stake_map_notify_stake(stake_map, stake, current_epoch);
});
log::info!(
"merge_program_account_in_strake_map history account not processed:{}",
stake_history_map.len()
);
}
pub fn read_stake_from_account_data(mut data: &[u8]) -> anyhow::Result<Option<Delegation>> {