add schedule verification and test

This commit is contained in:
musitdev 2023-08-12 10:58:41 +02:00
parent eae5f5363d
commit 92ae167679
3 changed files with 154 additions and 49 deletions

View File

@ -1,14 +1,19 @@
use anyhow::bail;
use solana_client::rpc_client::RpcClient;
use solana_ledger::leader_schedule::LeaderSchedule;
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::str::FromStr;
pub fn calculate_leader_schedule_from_stake_map(
stake_map: &crate::stakestore::StakeMap,
current_epoch_info: &EpochInfo,
) -> LeaderSchedule {
) -> anyhow::Result<LeaderSchedule> {
let mut stakes = HashMap::<Pubkey, u64>::new();
log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
for storestake in stake_map.values() {
// Ignore stake accounts activated in this epoch (or later, to include activation_epoch of
// u64::MAX which indicates no activation ever happened)
@ -30,7 +35,10 @@ pub fn calculate_leader_schedule_from_stake_map(
fn calculate_leader_schedule(
stakes: HashMap<Pubkey, u64>,
current_epoch_info: &EpochInfo,
) -> LeaderSchedule {
) -> anyhow::Result<LeaderSchedule> {
if stakes.is_empty() {
bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
}
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&current_epoch_info.epoch.to_le_bytes());
let mut stakes: Vec<_> = stakes
@ -38,12 +46,13 @@ fn calculate_leader_schedule(
.map(|(pubkey, stake)| (*pubkey, *stake))
.collect();
sort_stakes(&mut stakes);
LeaderSchedule::new(
log::trace!("calculate_leader_schedule stakes:{stakes:?}");
Ok(LeaderSchedule::new(
&stakes,
seed,
current_epoch_info.slots_in_epoch,
NUM_CONSECUTIVE_LEADER_SLOTS,
)
))
}
// Cribbed from leader_schedule_utils
@ -62,3 +71,51 @@ fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
// Now that it's sorted, we can do an O(n) dedup.
stakes.dedup();
}
pub fn verify_schedule(schedule: LeaderSchedule, rpc_url: String) -> anyhow::Result<()> {
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
let Some(mut leader_schedule_finalized) = rpc_client.get_leader_schedule(None)? else {
log::info!("verify_schedule RPC return no schedule. Try later.");
return Ok(());
};
//map leaderscheudle to HashMap<PubKey, Vec<slot>>
let mut input_leader_schedule: HashMap<Pubkey, Vec<usize>> = HashMap::new();
for (slot, pubkey) in schedule.get_slot_leaders().into_iter().copied().enumerate() {
input_leader_schedule
.entry(pubkey)
.or_insert(vec![])
.push(slot);
}
let mut vote_account_in_error: Vec<Pubkey> = input_leader_schedule.into_iter().filter_map(|(input_vote_key, mut input_slot_list)| {
let Some(mut rpc_strake_list) = leader_schedule_finalized.remove(&input_vote_key.to_string()) else {
log::warn!("verify_schedule vote account not found in RPC:{input_vote_key}");
return Some(input_vote_key);
};
input_slot_list.sort();
rpc_strake_list.sort();
if input_slot_list.into_iter().zip(rpc_strake_list.into_iter()).filter(|(in_v, rpc)| in_v != rpc).next().is_some() {
log::warn!("verify_schedule bad slots for {input_vote_key}"); // Caluclated:{input_slot_list:?} rpc:{rpc_strake_list:?}
Some(input_vote_key)
} else {
None
}
}).collect();
if !leader_schedule_finalized.is_empty() {
log::warn!(
"verify_schedule RPC vote account not present in calculated schedule:{:?}",
leader_schedule_finalized.keys()
);
vote_account_in_error.append(
&mut leader_schedule_finalized
.keys()
.map(|sk| Pubkey::from_str(&sk).unwrap())
.collect::<Vec<Pubkey>>(),
);
}
log::info!("verify_schedule these account are wrong:{vote_account_in_error:?}");
Ok(())
}

View File

@ -9,7 +9,6 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState;
use std::collections::HashMap;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::CommitmentLevel;
@ -25,8 +24,12 @@ mod stakestore;
type Slot = u64;
//WebSocket URL: ws://localhost:8900/ (computed)
const GRPC_URL: &str = "http://127.0.0.0:10000";
const RPC_URL: &str = "https://api.mainnet-beta.solana.com";
const RPC_URL: &str = "http://localhost:8899";
//const RPC_URL: &str = "https://api.mainnet-beta.solana.com";
//const RPC_URL: &str = "https://api.testnet.solana.com";
//const RPC_URL: &str = "https://api.devnet.solana.com";
@ -57,6 +60,23 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<crate::stakestore::StakeMap> {
let new_store = std::mem::take(stakestore);
let (new_store, stake_map) = new_store.extract()?;
*stakestore = new_store;
Ok(stake_map)
}
fn merge_stakestore(
stakestore: &mut StakeStore,
stake_map: crate::stakestore::StakeMap,
) -> anyhow::Result<()> {
let new_store = std::mem::take(stakestore);
let new_store = new_store.merge_stake(stake_map)?;
*stakestore = new_store;
Ok(())
}
async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Result<()> {
//local vars
let mut current_slot: CurrentSlot = Default::default();
@ -109,11 +129,13 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let jh = tokio::spawn(async move {
match to_exec {
TaskToExec::RpcGetPa => {
log::trace!("TaskToExec RpcGetPa start");
let rpc_client = RpcClient::new_with_commitment(RPC_URL.to_string(), CommitmentConfig::finalized());
let res = rpc_client.get_program_accounts(&solana_sdk::stake::program::id()).await;
TaskResult::RpcGetPa(res)
},
TaskToExec::RpcGetCurrentEpoch => {
log::trace!("TaskToExec RpcGetCurrentEpoch start");
let rpc_client = RpcClient::new_with_commitment(RPC_URL.to_string(), CommitmentConfig::finalized());
let res = rpc_client.get_epoch_info().await;
TaskResult::CurrentEpoch(res)
@ -126,15 +148,13 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
Some(some_res) = spawned_task_result.next() => {
match some_res {
Ok(TaskResult::RpcGetPa(Ok(pa_list))) => {
let new_store = std::mem::take(&mut stakestore);
let Ok((new_store, mut stake_map)) = new_store.extract() else {
let Ok(mut stake_map) = extract_stakestore(&mut stakestore) else {
//retry later, epoch schedule is currently processed
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
continue;
};
stakestore = new_store;
//merge new PA with stake map in a specific thread
log::trace!("Run_loop before Program account stake merge");
log::trace!("Run_loop before Program account stake merge START");
let jh = tokio::task::spawn_blocking(|| {
crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, pa_list);
@ -147,17 +167,50 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
current_epoch = epoch_info;
}
Ok(TaskResult::MergePAList(stake_map)) => {
let new_store = std::mem::take(&mut stakestore);
let Ok(new_store) = new_store.merge_stake(stake_map) else {
//should never occurs because only getPA can merge and only one occurs at time.
if let Err(err) = merge_stakestore(&mut stakestore, stake_map) {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map");
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
continue;
};
stakestore = new_store;
log::trace!("Run_loop end Program account stake merge");
log::trace!("Run_loop Program account stake merge END");
//TODO REMOVE
//To test verify the schedule
let Ok(stake_map) = extract_stakestore(&mut stakestore) else {
log::info!("Epoch schedule aborted because a getPA is currently running.");
continue;
};
let move_epoch = current_epoch.clone();
let jh = tokio::task::spawn_blocking(move || {
let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &move_epoch);
TaskResult::ScheduleResult(schedule.ok(), stake_map)
});
spawned_task_result.push(jh);
//end test
}
Ok(TaskResult::ScheduleResult(schedule_opt, stake_map)) => {
//merge stake
if let Err(err) = merge_stakestore(&mut stakestore, stake_map) {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
continue;
};
//verify calculated shedule with the one the RPC return.
if let Some(schedule) = schedule_opt {
tokio::task::spawn_blocking(|| {
if let Err(err) = crate::leader_schedule::verify_schedule(schedule,RPC_URL.to_string()) {
log::warn!("Error during schedule verification:{err}");
}
});
}
}
_ => log::warn!("RPC call return invalid result: {some_res:?}"),
}
@ -171,11 +224,12 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//process the message
match message {
Ok(msg) => {
log::info!("new message: {msg:?}");
//log::info!("new message: {msg:?}");
match msg.update_oneof {
Some(UpdateOneof::Account(account)) => {
//store new account stake.
if let Some(account) = read_account(account, current_slot.confirmed_slot) {
log::trace!("Geyser receive new account");
if let Err(err) = stakestore.add_stake(account) {
log::warn!("Can't add new stake from account data err:{}", err);
continue;
@ -184,23 +238,21 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}
Some(UpdateOneof::Slot(slot)) => {
//update current slot
log::info!("Processing slot: {:?}", slot);
//log::info!("Processing slot: {:?}", slot);
current_slot.update_slot(&slot);
if current_slot.confirmed_slot == current_epoch.slot_index + current_epoch.slots_in_epoch {
log::info!("End epoch slot. Calculate schedule.");
let new_store = std::mem::take(&mut stakestore);
let Ok((new_store, stake_map)) = new_store.extract() else {
let Ok(stake_map) = extract_stakestore(&mut stakestore) else {
log::info!("Epoch schedule aborted because a getPA is currently running.");
continue;
};
stakestore = new_store;
//calculate schedule in a dedicated thread.
let move_epoch = current_epoch.clone();
let jh = tokio::task::spawn_blocking(move || {
let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &move_epoch);
TaskResult::ScheduleResult(schedule)
TaskResult::ScheduleResult(schedule.ok(), stake_map)
});
spawned_task_result.push(jh);
@ -212,6 +264,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}
}
Some(UpdateOneof::Ping(_)) => (),
bad_msg => {
log::info!("Geyser stream unexpected message received:{:?}",bad_msg);
}
@ -273,20 +326,6 @@ impl AccountPretty {
}
}
// fn update_stakes(stakes: &mut StakeStore, stake: Delegation, current_epoch: u64) {
// // Ignore stake accounts activated in this epoch (or later, to include activation_epoch of
// // u64::MAX which indicates no activation ever happened)
// if stake.activation_epoch >= current_epoch {
// return;
// }
// // Ignore stake accounts deactivated before this epoch
// if stake.deactivation_epoch < current_epoch {
// return;
// }
// // Add the stake in this stake account to the total for the delegated-to vote account
// *(stakes.entry(stake.voter_pubkey.clone()).or_insert(0)) += stake.stake;
// }
fn read_account(
geyser_account: SubscribeUpdateAccount,
current_slot: u64,
@ -328,5 +367,5 @@ enum TaskResult {
RpcGetPa(Result<Vec<(Pubkey, Account)>, ClientError>),
CurrentEpoch(Result<EpochInfo, ClientError>),
MergePAList(crate::stakestore::StakeMap),
ScheduleResult(LeaderSchedule),
ScheduleResult(Option<LeaderSchedule>, crate::stakestore::StakeMap),
}

View File

@ -1,10 +1,10 @@
use crate::AccountPretty;
use crate::Slot;
use crate::StakeState;
use anyhow::bail;
use borsh::BorshDeserialize;
use solana_sdk::account::Account;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState;
use std::collections::HashMap;
use crate::Pubkey;
@ -17,11 +17,13 @@ fn stake_map_insert_stake(map: &mut StakeMap, stake_account: Pubkey, stake: Stor
std::collections::hash_map::Entry::Occupied(occupied) => {
let strstake = occupied.into_mut(); // <-- get mut reference to existing value
if strstake.last_update_slot < stake.last_update_slot {
log::trace!("Stake updated for: {stake_account} stake:{stake:?}");
*strstake = stake;
}
}
// If value doesn't exist yet, then insert a new value of 1
std::collections::hash_map::Entry::Vacant(vacant) => {
log::trace!("New stake added for: {stake_account} stake:{stake:?}");
vacant.insert(stake);
}
};
@ -83,19 +85,22 @@ impl StakeStore {
}
pub fn add_stake(&mut self, new_account: AccountPretty) -> anyhow::Result<()> {
let Ok(Some(delegated_stake)) = new_account.read_stake() else {
let Ok(delegated_stake_opt) = new_account.read_stake() else {
bail!("Can't read stake from account data");
};
let ststake = StoredStake {
stake: delegated_stake,
last_update_slot: new_account.slot,
write_version: new_account.write_version,
};
match self.extracted {
true => self.updates.push((new_account.pubkey, ststake)),
false => self.insert_stake(new_account.pubkey, ststake),
if let Some(delegated_stake) = delegated_stake_opt {
let ststake = StoredStake {
stake: delegated_stake,
last_update_slot: new_account.slot,
write_version: new_account.write_version,
};
match self.extracted {
true => self.updates.push((new_account.pubkey, ststake)),
false => self.insert_stake(new_account.pubkey, ststake),
}
}
Ok(())
}
@ -131,10 +136,14 @@ pub fn merge_program_account_in_strake_map(
pub fn read_stake_from_account_data(mut data: &[u8]) -> anyhow::Result<Option<Delegation>> {
if data.is_empty() {
bail!("Error: read strake of PA account with empty data");
log::warn!("read stake from account empty stake account.");
bail!("Error: read stake of PA account with empty data");
}
match StakeState::deserialize(&mut data)? {
StakeState::Stake(_, stake) => Ok(Some(stake.delegation)),
_ => Ok(None),
StakeState::Initialized(_) => Ok(None),
other => {
bail!("read stake from account not a stake account. read:{other:?}");
}
}
}