Compare commits
4 Commits
4ddfa77462
...
2dc96a7a50
Author | SHA1 | Date |
---|---|---|
musitdev | 2dc96a7a50 | |
musitdev | be86bb7414 | |
musitdev | ac0c3615a6 | |
musitdev | 6add71f84c |
|
@ -2,14 +2,13 @@ use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeSto
|
|||
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
|
||||
use crate::Slot;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::TryFutureExt;
|
||||
use solana_client::client_error::ClientError;
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_client::rpc_client::RpcClient;
|
||||
use solana_sdk::account::Account;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
/*
|
||||
|
||||
Bootstrap state changes
|
||||
|
@ -91,15 +90,15 @@ fn process_bootstrap_event(
|
|||
) -> BootsrapProcessResult {
|
||||
match event {
|
||||
BootstrapEvent::InitBootstrap => {
|
||||
let jh = tokio::spawn({
|
||||
let jh = tokio::task::spawn_blocking({
|
||||
let rpc_url = data.rpc_url.clone();
|
||||
let sleep_time = data.sleep_time;
|
||||
async move {
|
||||
move || {
|
||||
log::info!("BootstrapEvent::InitBootstrap RECV");
|
||||
if sleep_time > 0 {
|
||||
tokio::time::sleep(Duration::from_secs(sleep_time)).await;
|
||||
std::thread::sleep(Duration::from_secs(sleep_time));
|
||||
}
|
||||
match crate::bootstrap::bootstrap_accounts(rpc_url).await {
|
||||
match crate::bootstrap::bootstrap_accounts(rpc_url) {
|
||||
Ok((stakes, votes, history)) => {
|
||||
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history)
|
||||
}
|
||||
|
@ -183,65 +182,50 @@ fn process_bootstrap_event(
|
|||
}
|
||||
}
|
||||
|
||||
async fn bootstrap_accounts(
|
||||
fn bootstrap_accounts(
|
||||
rpc_url: String,
|
||||
) -> Result<(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>, Account), ClientError> {
|
||||
let furure = get_stake_account(rpc_url)
|
||||
.and_then(|(stakes, rpc_url)| async move {
|
||||
get_vote_account(rpc_url)
|
||||
.await
|
||||
.map(|(votes, rpc_url)| (stakes, votes, rpc_url))
|
||||
get_stake_account(rpc_url)
|
||||
.and_then(|(stakes, rpc_url)| {
|
||||
get_vote_account(rpc_url).map(|(votes, rpc_url)| (stakes, votes, rpc_url))
|
||||
})
|
||||
.and_then(|(stakes, votes, rpc_url)| {
|
||||
get_stakehistory_account(rpc_url).map(|history| (stakes, votes, history))
|
||||
})
|
||||
.and_then(|(stakes, votes, rpc_url)| async move {
|
||||
get_stakehistory_account(rpc_url)
|
||||
.await
|
||||
.map(|history| (stakes, votes, history))
|
||||
});
|
||||
furure.await
|
||||
}
|
||||
|
||||
async fn get_stake_account(
|
||||
rpc_url: String,
|
||||
) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
|
||||
fn get_stake_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
|
||||
log::info!("TaskToExec RpcGetStakeAccount start");
|
||||
let rpc_client = RpcClient::new_with_timeout_and_commitment(
|
||||
rpc_url.clone(),
|
||||
Duration::from_secs(600),
|
||||
CommitmentConfig::finalized(),
|
||||
);
|
||||
let res_stake = rpc_client
|
||||
.get_program_accounts(&solana_sdk::stake::program::id())
|
||||
.await;
|
||||
let res_stake = rpc_client.get_program_accounts(&solana_sdk::stake::program::id());
|
||||
log::info!("TaskToExec RpcGetStakeAccount END");
|
||||
res_stake.map(|stake| (stake, rpc_url))
|
||||
}
|
||||
|
||||
async fn get_vote_account(
|
||||
rpc_url: String,
|
||||
) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
|
||||
fn get_vote_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
|
||||
log::info!("TaskToExec RpcGetVoteAccount start");
|
||||
let rpc_client = RpcClient::new_with_timeout_and_commitment(
|
||||
rpc_url.clone(),
|
||||
Duration::from_secs(600),
|
||||
CommitmentConfig::finalized(),
|
||||
);
|
||||
let res_vote = rpc_client
|
||||
.get_program_accounts(&solana_sdk::vote::program::id())
|
||||
.await;
|
||||
let res_vote = rpc_client.get_program_accounts(&solana_sdk::vote::program::id());
|
||||
log::info!("TaskToExec RpcGetVoteAccount END");
|
||||
res_vote.map(|votes| (votes, rpc_url))
|
||||
}
|
||||
|
||||
async fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
|
||||
pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
|
||||
log::info!("TaskToExec RpcGetStakeHistory start");
|
||||
let rpc_client = RpcClient::new_with_timeout_and_commitment(
|
||||
rpc_url,
|
||||
Duration::from_secs(600),
|
||||
CommitmentConfig::finalized(),
|
||||
);
|
||||
let res_stake = rpc_client
|
||||
.get_account(&solana_sdk::sysvar::stake_history::id())
|
||||
.await;
|
||||
let res_stake = rpc_client.get_account(&solana_sdk::sysvar::stake_history::id());
|
||||
log::info!("TaskToExec RpcGetStakeHistory END",);
|
||||
res_stake
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ use crate::Slot;
|
|||
use solana_account_decoder::parse_sysvar::SysvarAccountType;
|
||||
use solana_client::client_error::ClientError;
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_program::epoch_schedule::EpochSchedule;
|
||||
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
|
||||
use solana_sdk::epoch_info::EpochInfo;
|
||||
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
|
||||
|
@ -45,6 +44,7 @@ impl CurrentEpochSlotState {
|
|||
// log::info!("reduce_stake_warmup_cooldown_epoch {reduce_stake_warmup_cooldown_epoch:?}");
|
||||
|
||||
//get epoch sysvar account to init epoch. More compatible with a snapshot bootstrap.
|
||||
//sysvar_epoch_schedule Some(EpochSchedule(EpochSchedule { slots_per_epoch: 100, leader_schedule_slot_offset: 100, warmup: false, first_normal_epoch: 0, first_normal_slot: 0 }))
|
||||
let res_epoch = rpc_client
|
||||
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
|
||||
.await?;
|
||||
|
|
|
@ -23,7 +23,7 @@ use tokio::task::JoinHandle;
|
|||
|
||||
const SCHEDULE_STAKE_BASE_FILE_NAME: &str = "aggregate_export_votestake";
|
||||
|
||||
pub const MAX_EPOCH_VALUE: u64 = 18446744073709551615;
|
||||
//pub const MAX_EPOCH_VALUE: u64 = 18446744073709551615;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct CalculatedSchedule {
|
||||
|
@ -152,6 +152,7 @@ InitLeaderscedule MergeStore(stakes, votes, schedule)
|
|||
*/
|
||||
|
||||
pub fn run_leader_schedule_events(
|
||||
rpc_url: String,
|
||||
event: LeaderScheduleEvent,
|
||||
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
|
||||
stakestore: &mut StakeStore,
|
||||
|
@ -160,14 +161,14 @@ pub fn run_leader_schedule_events(
|
|||
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
|
||||
EpochInfo,
|
||||
)> {
|
||||
let result = process_leadershedule_event(event, stakestore, votestore);
|
||||
let result = process_leadershedule_event(rpc_url.clone(), event, stakestore, votestore);
|
||||
match result {
|
||||
LeaderScheduleResult::TaskHandle(jh) => {
|
||||
bootstrap_tasks.push(jh);
|
||||
None
|
||||
}
|
||||
LeaderScheduleResult::Event(event) => {
|
||||
run_leader_schedule_events(event, bootstrap_tasks, stakestore, votestore)
|
||||
run_leader_schedule_events(rpc_url, event, bootstrap_tasks, stakestore, votestore)
|
||||
}
|
||||
LeaderScheduleResult::End(schedule, epoch) => Some((schedule, epoch)),
|
||||
}
|
||||
|
@ -195,6 +196,7 @@ enum LeaderScheduleResult {
|
|||
|
||||
//TODO remove desactivated account after leader schedule calculus.
|
||||
fn process_leadershedule_event(
|
||||
rpc_url: String,
|
||||
event: LeaderScheduleEvent,
|
||||
stakestore: &mut StakeStore,
|
||||
votestore: &mut VoteStore,
|
||||
|
@ -204,6 +206,20 @@ fn process_leadershedule_event(
|
|||
log::info!("LeaderScheduleEvent::InitLeaderschedule RECV");
|
||||
//For test TODO put in extract and restore process to avoid to clone.
|
||||
let stake_history = stakestore.get_cloned_stake_history();
|
||||
//TODO get a way to be updated of stake history.
|
||||
//request the current state using RPC.
|
||||
//TODO remove the await from the scheduler task.
|
||||
let before_stake_histo = stake_history.clone();
|
||||
let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
|
||||
.map(|account| crate::stakestore::read_historystake_from_account(account))
|
||||
.unwrap_or_else(|_| {
|
||||
log::error!("Error during stake history fetch. Use bootstrap one");
|
||||
stake_history
|
||||
});
|
||||
log::info!(
|
||||
"stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
|
||||
schedule_epoch.epoch
|
||||
);
|
||||
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
|
||||
(Ok(stake_map), Ok(vote_map)) => {
|
||||
LeaderScheduleResult::Event(LeaderScheduleEvent::CalculateScedule(
|
||||
|
@ -287,6 +303,58 @@ fn process_leadershedule_event(
|
|||
}
|
||||
}
|
||||
|
||||
// fn calculate_epoch_stakes(stake_map: &StakeMap) {
|
||||
// let stake_history_entry =
|
||||
|
||||
// let stake_delegations: Vec<_> = self.stake_delegations.values().collect();
|
||||
// // Wrap up the prev epoch by adding new stake history entry for the
|
||||
// // prev epoch.
|
||||
// let stake_history_entry = thread_pool.install(|| {
|
||||
// stake_delegations
|
||||
// .par_iter()
|
||||
// .fold(StakeActivationStatus::default, |acc, stake_account| {
|
||||
// let delegation = stake_account.delegation();
|
||||
// acc + delegation.stake_activating_and_deactivating(
|
||||
// self.epoch,
|
||||
// Some(&self.stake_history),
|
||||
// new_rate_activation_epoch,
|
||||
// )
|
||||
// })
|
||||
// .reduce(StakeActivationStatus::default, Add::add)
|
||||
// });
|
||||
// self.stake_history.add(self.epoch, stake_history_entry);
|
||||
// self.epoch = next_epoch;
|
||||
// // Refresh the stake distribution of vote accounts for the next epoch,
|
||||
// // using new stake history.
|
||||
// let delegated_stakes = thread_pool.install(|| {
|
||||
// stake_delegations
|
||||
// .par_iter()
|
||||
// .fold(HashMap::default, |mut delegated_stakes, stake_account| {
|
||||
// let delegation = stake_account.delegation();
|
||||
// let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
|
||||
// *entry += delegation.stake(
|
||||
// self.epoch,
|
||||
// Some(&self.stake_history),
|
||||
// new_rate_activation_epoch,
|
||||
// );
|
||||
// delegated_stakes
|
||||
// })
|
||||
// .reduce(HashMap::default, merge)
|
||||
// });
|
||||
// self.vote_accounts = self
|
||||
// .vote_accounts
|
||||
// .iter()
|
||||
// .map(|(&vote_pubkey, vote_account)| {
|
||||
// let delegated_stake = delegated_stakes
|
||||
// .get(&vote_pubkey)
|
||||
// .copied()
|
||||
// .unwrap_or_default();
|
||||
// (vote_pubkey, (delegated_stake, vote_account.clone()))
|
||||
// })
|
||||
// .collect();
|
||||
|
||||
// }
|
||||
|
||||
fn calculate_leader_schedule_from_stake_map(
|
||||
stake_map: &crate::stakestore::StakeMap,
|
||||
vote_map: &crate::votestore::VoteMap,
|
||||
|
@ -307,7 +375,7 @@ fn calculate_leader_schedule_from_stake_map(
|
|||
for storestake in stake_map.values() {
|
||||
//get nodeid for vote account
|
||||
let Some(vote_account) = vote_map.get(&storestake.stake.voter_pubkey) else {
|
||||
log::warn!(
|
||||
log::info!(
|
||||
"Vote account not found in vote map for stake vote account:{}",
|
||||
&storestake.stake.voter_pubkey
|
||||
);
|
||||
|
@ -329,7 +397,7 @@ fn calculate_leader_schedule_from_stake_map(
|
|||
.absolute_slot
|
||||
.saturating_sub(ten_epoch_slot_long)
|
||||
{
|
||||
log::warn!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
|
||||
log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
|
||||
, storestake.stake.voter_pubkey
|
||||
,vote_account.vote_data.node_pubkey
|
||||
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||
|
|
|
@ -259,6 +259,9 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
let mut stake_verification_sender =
|
||||
crate::stakestore::start_stake_verification_loop(RPC_URL.to_string()).await;
|
||||
|
||||
//TODO remove. Store parent hash to see if we don't miss a block.
|
||||
let mut parent_block_slot = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(req) = request_rx.recv() => {
|
||||
|
@ -330,6 +333,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
//Manage RPC call result execution
|
||||
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
|
||||
if let Some((new_schedule, epoch)) = crate::leader_schedule::run_leader_schedule_events(
|
||||
RPC_URL.to_string(),
|
||||
event,
|
||||
&mut spawned_leader_schedule_task,
|
||||
&mut stakestore,
|
||||
|
@ -432,6 +436,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
if bootstrap_data.done {
|
||||
if let Some(init_event) = schedule_event {
|
||||
crate::leader_schedule::run_leader_schedule_events(
|
||||
RPC_URL.to_string(),
|
||||
init_event,
|
||||
&mut spawned_leader_schedule_task,
|
||||
&mut stakestore,
|
||||
|
@ -445,7 +450,23 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
log::info!("Receive Block Meta at slot: {}", block_meta.slot);
|
||||
}
|
||||
Some(UpdateOneof::Block(block)) => {
|
||||
log::trace!("Receive Block at slot: {}", block.slot);
|
||||
log::trace!("Receive Block at slot: {} hash:{} parent_slot:{}",
|
||||
block.slot,
|
||||
block.blockhash,
|
||||
block.parent_slot,
|
||||
);
|
||||
|
||||
//TODO remove; Detect missing block
|
||||
if let Some(parent_block_slot) = parent_block_slot {
|
||||
if parent_block_slot != block.parent_slot {
|
||||
log::error!("Bad parent slot stored:{} block:{}, miss a block"
|
||||
,parent_block_slot,block.parent_slot
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
parent_block_slot = Some(block.slot);
|
||||
|
||||
//parse to detect stake merge tx.
|
||||
//first in the main thread then in a specific thread.
|
||||
let stake_public_key: Vec<u8> = solana_sdk::stake::program::id().to_bytes().to_vec();
|
||||
|
|
Loading…
Reference in New Issue