change vote account to node id for schedule calculus

This commit is contained in:
musitdev 2023-09-09 11:55:28 +02:00
parent b68890cd1f
commit c024ca6f26
3 changed files with 64 additions and 26 deletions

View File

@ -14,15 +14,26 @@ const MAX_EPOCH_VALUE: u64 = 18446744073709551615;
pub fn calculate_leader_schedule_from_stake_map(
stake_map: &crate::stakestore::StakeMap,
vote_map: &crate::votestore::VoteMap,
current_epoch_info: &EpochInfo,
) -> anyhow::Result<LeaderSchedule> {
let mut stakes = HashMap::<Pubkey, u64>::new();
log::trace!(
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{}",
vote_map.len(),
stake_map.len()
);
//log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
for storestake in stake_map.values() {
//log::info!("Program_accounts stake:{stake:#?}");
if is_stake_to_add(storestake.pubkey, &storestake.stake, &current_epoch_info) {
// Add the stake in this stake account to the total for the delegated-to vote account
*(stakes.entry(storestake.stake.voter_pubkey).or_insert(0)) += storestake.stake.stake;
//get nodeid for vote account
let Some(nodeid) = vote_map.get(&storestake.stake.voter_pubkey).map(|v| v.vote_data.node_pubkey) else {
log::warn!("Vote account not found in vote map for stake vote account:{}", &storestake.stake.voter_pubkey);
continue;
};
*(stakes.entry(nodeid).or_insert(0)) += storestake.stake.stake;
}
}
calculate_leader_schedule(stakes, current_epoch_info)
@ -97,25 +108,26 @@ fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
}
pub fn verify_schedule(schedule: LeaderSchedule, rpc_url: String) -> anyhow::Result<()> {
log::info!("verify_schedule Start.");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::confirmed(),
);
let Some(rpc_leader_schedule) = rpc_client.get_leader_schedule(None)? else {
let Some(mut rpc_leader_schedule) = rpc_client.get_leader_schedule(None)? else {
log::info!("verify_schedule RPC return no schedule. Try later.");
return Ok(());
};
log::info!("");
let vote_account = rpc_client.get_vote_accounts()?;
let node_vote_table = vote_account
.current
.into_iter()
.chain(vote_account.delinquent.into_iter())
.map(|va| (va.node_pubkey, va.vote_pubkey))
.collect::<HashMap<String, String>>();
// let vote_account = rpc_client.get_vote_accounts()?;
// let node_vote_table = vote_account
// .current
// .into_iter()
// .chain(vote_account.delinquent.into_iter())
// .map(|va| (va.node_pubkey, va.vote_pubkey))
// .collect::<HashMap<String, String>>();
//log::info!("node_vote_table:{node_vote_table:?}");
@ -133,13 +145,13 @@ pub fn verify_schedule(schedule: LeaderSchedule, rpc_url: String) -> anyhow::Res
// }
//map rpc leader schedule node pubkey to vote account
let mut rpc_leader_schedule: HashMap<String, Vec<usize>> = rpc_leader_schedule.into_iter().filter_map(|(pk, slots)| match node_vote_table.get(&pk) {
Some(vote_account) => Some((vote_account.clone(),slots)),
None => {
log::warn!("verify_schedule RPC get_leader_schedule return some Node account:{pk} that are not mapped by rpc get_vote_accounts");
None
},
}).collect();
// let mut rpc_leader_schedule: HashMap<String, Vec<usize>> = rpc_leader_schedule.into_iter().filter_map(|(pk, slots)| match node_vote_table.get(&pk) {
// Some(vote_account) => Some((vote_account.clone(),slots)),
// None => {
// log::warn!("verify_schedule RPC get_leader_schedule return some Node account:{pk} that are not mapped by rpc get_vote_accounts");
// None
// },
// }).collect();
// if let Err(err) = save_schedule_on_file("rpc", &rpc_leader_schedule) {
// log::error!("Error during saving generated schedule:{err}");

View File

@ -365,22 +365,38 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetCurrentEpoch));
}
}
Ok(TaskResult::ScheduleResult(schedule_opt, stake_map)) => {
Ok(TaskResult::ScheduleResult(schedule_opt, stake_map, vote_map)) => {
//merge stake
if let Err(err) = merge_stakestore(&mut stakestore, stake_map, &current_epoch) {
let merge_error = match merge_stakestore(&mut stakestore, stake_map, &current_epoch) {
Ok(()) => false,
Err(err) => {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(0, 0)));
true
}
};
//merge vote
if let Err(err) = merge_votestore(&mut votestore, vote_map) {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(0, 0)));
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetVoteAccount(0, 0)));
continue;
};
if merge_error {
continue;
}
//verify calculated shedule with the one the RPC return.
if let Some(schedule) = schedule_opt {
tokio::task::spawn_blocking(|| {
//10 second that the schedule has been calculated on the validator
std::thread::sleep(std::time::Duration::from_secs(60));
std::thread::sleep(std::time::Duration::from_secs(20));
log::info!("Start Verify schedule");
if let Err(err) = crate::leader_schedule::verify_schedule(schedule,RPC_URL.to_string()) {
log::warn!("Error during schedule verification:{err}");
@ -457,12 +473,19 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
log::info!("End epoch slot, change epoch. Calculate schedule at current slot:{}", current_slot.confirmed_slot);
let Ok(stake_map) = extract_stakestore(&mut stakestore) else {
log::info!("Epoch schedule aborted because a getPA is currently running.");
log::info!("Epoch schedule aborted because a extract_stakestore faild.");
continue;
};
let Ok(vote_map) = extract_votestore(&mut votestore) else {
log::info!("Epoch schedule aborted because extract_votestore faild.");
//cancel stake extraction
merge_stakestore(&mut stakestore, stake_map, &current_epoch).unwrap(); //just extracted.
continue;
};
//reload PA account for new epoch start. TODO replace with bootstrap.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(next_epoch_start_slot, 30))); //Wait 30s to get new PA.
//spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(next_epoch_start_slot, 30))); //Wait 30s to get new PA.
//change epoch. Change manually then update using RPC.
current_epoch.epoch +=1;
@ -475,9 +498,9 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let jh = tokio::task::spawn_blocking({
let move_epoch = current_epoch.clone();
move || {
let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &move_epoch);
let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &vote_map, &move_epoch);
log::info!("End calculate leader schedule at slot:{}", current_slot.confirmed_slot);
TaskResult::ScheduleResult(schedule.ok(), stake_map)
TaskResult::ScheduleResult(schedule.ok(), stake_map, vote_map)
}
});
spawned_task_result.push(jh);
@ -641,5 +664,9 @@ enum TaskResult {
CurrentEpoch(Result<EpochInfo, ClientError>),
MergeStakeList(crate::stakestore::StakeMap),
MergeVoteList(crate::votestore::VoteMap),
ScheduleResult(Option<LeaderSchedule>, crate::stakestore::StakeMap),
ScheduleResult(
Option<LeaderSchedule>,
crate::stakestore::StakeMap,
crate::votestore::VoteMap,
),
}

View File

@ -2,7 +2,6 @@ use crate::AccountPretty;
use crate::Slot;
use anyhow::bail;
use solana_sdk::account::Account;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::vote::state::VoteState;
use std::collections::HashMap;