Compare commits

...

4 Commits

Author SHA1 Message Date
musitdev 2dc96a7a50 log stake history changes 2023-10-04 18:08:47 +02:00
musitdev be86bb7414 remove slot test 2023-10-04 15:58:00 +02:00
musitdev ac0c3615a6 add some code to detect missing block or slot 2023-10-04 15:54:50 +02:00
musitdev 6add71f84c add stake history fetch before schedule calculus 2023-10-04 14:45:56 +02:00
4 changed files with 115 additions and 42 deletions

View File

@ -2,14 +2,13 @@ use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeSto
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
use crate::Slot;
use futures_util::stream::FuturesUnordered;
use futures_util::TryFutureExt;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_client::RpcClient;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::Duration;
/*
Bootstrap state changes
@ -91,15 +90,15 @@ fn process_bootstrap_event(
) -> BootsrapProcessResult {
match event {
BootstrapEvent::InitBootstrap => {
let jh = tokio::spawn({
let jh = tokio::task::spawn_blocking({
let rpc_url = data.rpc_url.clone();
let sleep_time = data.sleep_time;
async move {
move || {
log::info!("BootstrapEvent::InitBootstrap RECV");
if sleep_time > 0 {
tokio::time::sleep(Duration::from_secs(sleep_time)).await;
std::thread::sleep(Duration::from_secs(sleep_time));
}
match crate::bootstrap::bootstrap_accounts(rpc_url).await {
match crate::bootstrap::bootstrap_accounts(rpc_url) {
Ok((stakes, votes, history)) => {
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history)
}
@ -183,65 +182,50 @@ fn process_bootstrap_event(
}
}
async fn bootstrap_accounts(
fn bootstrap_accounts(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>, Account), ClientError> {
let furure = get_stake_account(rpc_url)
.and_then(|(stakes, rpc_url)| async move {
get_vote_account(rpc_url)
.await
.map(|(votes, rpc_url)| (stakes, votes, rpc_url))
get_stake_account(rpc_url)
.and_then(|(stakes, rpc_url)| {
get_vote_account(rpc_url).map(|(votes, rpc_url)| (stakes, votes, rpc_url))
})
.and_then(|(stakes, votes, rpc_url)| {
get_stakehistory_account(rpc_url).map(|history| (stakes, votes, history))
})
.and_then(|(stakes, votes, rpc_url)| async move {
get_stakehistory_account(rpc_url)
.await
.map(|history| (stakes, votes, history))
});
furure.await
}
async fn get_stake_account(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
fn get_stake_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetStakeAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client
.get_program_accounts(&solana_sdk::stake::program::id())
.await;
let res_stake = rpc_client.get_program_accounts(&solana_sdk::stake::program::id());
log::info!("TaskToExec RpcGetStakeAccount END");
res_stake.map(|stake| (stake, rpc_url))
}
async fn get_vote_account(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
fn get_vote_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetVoteAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_vote = rpc_client
.get_program_accounts(&solana_sdk::vote::program::id())
.await;
let res_vote = rpc_client.get_program_accounts(&solana_sdk::vote::program::id());
log::info!("TaskToExec RpcGetVoteAccount END");
res_vote.map(|votes| (votes, rpc_url))
}
async fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
log::info!("TaskToExec RpcGetStakeHistory start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client
.get_account(&solana_sdk::sysvar::stake_history::id())
.await;
let res_stake = rpc_client.get_account(&solana_sdk::sysvar::stake_history::id());
log::info!("TaskToExec RpcGetStakeHistory END",);
res_stake
}

View File

@ -3,7 +3,6 @@ use crate::Slot;
use solana_account_decoder::parse_sysvar::SysvarAccountType;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_program::epoch_schedule::EpochSchedule;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::epoch_info::EpochInfo;
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
@ -45,6 +44,7 @@ impl CurrentEpochSlotState {
// log::info!("reduce_stake_warmup_cooldown_epoch {reduce_stake_warmup_cooldown_epoch:?}");
//get epoch sysvar account to init epoch. More compatible with a snapshot bootstrap.
//sysvar_epoch_schedule Some(EpochSchedule(EpochSchedule { slots_per_epoch: 100, leader_schedule_slot_offset: 100, warmup: false, first_normal_epoch: 0, first_normal_slot: 0 }))
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;

View File

@ -23,7 +23,7 @@ use tokio::task::JoinHandle;
const SCHEDULE_STAKE_BASE_FILE_NAME: &str = "aggregate_export_votestake";
pub const MAX_EPOCH_VALUE: u64 = 18446744073709551615;
//pub const MAX_EPOCH_VALUE: u64 = 18446744073709551615;
#[derive(Debug, Default)]
pub struct CalculatedSchedule {
@ -152,6 +152,7 @@ InitLeaderscedule MergeStore(stakes, votes, schedule)
*/
pub fn run_leader_schedule_events(
rpc_url: String,
event: LeaderScheduleEvent,
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
stakestore: &mut StakeStore,
@ -160,14 +161,14 @@ pub fn run_leader_schedule_events(
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
EpochInfo,
)> {
let result = process_leadershedule_event(event, stakestore, votestore);
let result = process_leadershedule_event(rpc_url.clone(), event, stakestore, votestore);
match result {
LeaderScheduleResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh);
None
}
LeaderScheduleResult::Event(event) => {
run_leader_schedule_events(event, bootstrap_tasks, stakestore, votestore)
run_leader_schedule_events(rpc_url, event, bootstrap_tasks, stakestore, votestore)
}
LeaderScheduleResult::End(schedule, epoch) => Some((schedule, epoch)),
}
@ -195,6 +196,7 @@ enum LeaderScheduleResult {
//TODO remove desactivated account after leader schedule calculus.
fn process_leadershedule_event(
rpc_url: String,
event: LeaderScheduleEvent,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
@ -204,6 +206,20 @@ fn process_leadershedule_event(
log::info!("LeaderScheduleEvent::InitLeaderschedule RECV");
//For test TODO put in extract and restore process to avoid to clone.
let stake_history = stakestore.get_cloned_stake_history();
//TODO get a way to be updated of stake history.
//request the current state using RPC.
//TODO remove the await from the scheduler task.
let before_stake_histo = stake_history.clone();
let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
.map(|account| crate::stakestore::read_historystake_from_account(account))
.unwrap_or_else(|_| {
log::error!("Error during stake history fetch. Use bootstrap one");
stake_history
});
log::info!(
"stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
schedule_epoch.epoch
);
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => {
LeaderScheduleResult::Event(LeaderScheduleEvent::CalculateScedule(
@ -287,6 +303,58 @@ fn process_leadershedule_event(
}
}
// fn calculate_epoch_stakes(stake_map: &StakeMap) {
// let stake_history_entry =
// let stake_delegations: Vec<_> = self.stake_delegations.values().collect();
// // Wrap up the prev epoch by adding new stake history entry for the
// // prev epoch.
// let stake_history_entry = thread_pool.install(|| {
// stake_delegations
// .par_iter()
// .fold(StakeActivationStatus::default, |acc, stake_account| {
// let delegation = stake_account.delegation();
// acc + delegation.stake_activating_and_deactivating(
// self.epoch,
// Some(&self.stake_history),
// new_rate_activation_epoch,
// )
// })
// .reduce(StakeActivationStatus::default, Add::add)
// });
// self.stake_history.add(self.epoch, stake_history_entry);
// self.epoch = next_epoch;
// // Refresh the stake distribution of vote accounts for the next epoch,
// // using new stake history.
// let delegated_stakes = thread_pool.install(|| {
// stake_delegations
// .par_iter()
// .fold(HashMap::default, |mut delegated_stakes, stake_account| {
// let delegation = stake_account.delegation();
// let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
// *entry += delegation.stake(
// self.epoch,
// Some(&self.stake_history),
// new_rate_activation_epoch,
// );
// delegated_stakes
// })
// .reduce(HashMap::default, merge)
// });
// self.vote_accounts = self
// .vote_accounts
// .iter()
// .map(|(&vote_pubkey, vote_account)| {
// let delegated_stake = delegated_stakes
// .get(&vote_pubkey)
// .copied()
// .unwrap_or_default();
// (vote_pubkey, (delegated_stake, vote_account.clone()))
// })
// .collect();
// }
fn calculate_leader_schedule_from_stake_map(
stake_map: &crate::stakestore::StakeMap,
vote_map: &crate::votestore::VoteMap,
@ -307,7 +375,7 @@ fn calculate_leader_schedule_from_stake_map(
for storestake in stake_map.values() {
//get nodeid for vote account
let Some(vote_account) = vote_map.get(&storestake.stake.voter_pubkey) else {
log::warn!(
log::info!(
"Vote account not found in vote map for stake vote account:{}",
&storestake.stake.voter_pubkey
);
@ -329,7 +397,7 @@ fn calculate_leader_schedule_from_stake_map(
.absolute_slot
.saturating_sub(ten_epoch_slot_long)
{
log::warn!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
, storestake.stake.voter_pubkey
,vote_account.vote_data.node_pubkey
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.

View File

@ -259,6 +259,9 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let mut stake_verification_sender =
crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await;
//TODO remove. Store parent hash to see if we don't miss a block.
let mut parent_block_slot = None;
loop {
tokio::select! {
Some(req) = request_rx.recv() => {
@ -330,6 +333,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//Manage RPC call result execution
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
if let Some((new_schedule, epoch)) = crate::leader_schedule::run_leader_schedule_events(
RPC_URL.to_string(),
event,
&mut spawned_leader_schedule_task,
&mut stakestore,
@ -432,6 +436,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
if bootstrap_data.done {
if let Some(init_event) = schedule_event {
crate::leader_schedule::run_leader_schedule_events(
RPC_URL.to_string(),
init_event,
&mut spawned_leader_schedule_task,
&mut stakestore,
@ -445,7 +450,23 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
log::info!("Receive Block Meta at slot: {}", block_meta.slot);
}
Some(UpdateOneof::Block(block)) => {
log::trace!("Receive Block at slot: {}", block.slot);
log::trace!("Receive Block at slot: {} hash:{} parent_slot:{}",
block.slot,
block.blockhash,
block.parent_slot,
);
//TODO remove; Detect missing block
if let Some(parent_block_slot) = parent_block_slot {
if parent_block_slot != block.parent_slot {
log::error!("Bad parent slot stored:{} block:{}, miss a block"
,parent_block_slot,block.parent_slot
);
}
}
parent_block_slot = Some(block.slot);
//parse to detect stake merge tx.
//first in the main thread then in a specific thread.
let stake_public_key: Vec<u8> = solana_sdk::stake::program::id().to_bytes().to_vec();