add stake and schedule bootstrap

This commit is contained in:
musitdev 2023-10-18 09:34:16 +02:00
parent b6cef88fa9
commit 952b0c866c
7 changed files with 422 additions and 46 deletions

View File

@ -1,13 +1,29 @@
use crate::epoch::ScheduleEpochData;
use crate::stake::StakeMap;
use crate::stake::StakeStore;
use crate::vote::VoteMap;
use crate::vote::VoteStore;
use anyhow::bail;
use futures_util::stream::FuturesUnordered;
use solana_client::client_error::ClientError;
use solana_client::rpc_client::RpcClient;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use std::sync::Arc;
use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake_history::StakeHistory;
use std::time::Duration;
use tokio::task::JoinHandle;
pub async fn bootstrap_process_data(
data_cache: &DataCache,
rpc_client: Arc<RpcClient>,
) -> (ScheduleEpochData, BootstrapData) {
//File where the Vote and stake use to calculate the leader schedule at epoch are stored.
//Use to bootstrap current and next epoch leader schedule.
//TODO to be removed with inter RPC bootstrap and snapshot read.
pub const CURRENT_EPOCH_VOTE_STAKES_FILE: &str = "current_vote_stakes.json";
pub const NEXT_EPOCH_VOTE_STAKES_FILE: &str = "next_vote_stakes.json";
pub async fn bootstrap_scheduleepoch_data(data_cache: &DataCache) -> ScheduleEpochData {
let new_rate_activation_epoch = solana_sdk::feature_set::FeatureSet::default()
.new_warmup_cooldown_rate_epoch(data_cache.epoch_data.get_epoch_schedule());
@ -22,25 +38,269 @@ pub async fn bootstrap_process_data(
new_rate_activation_epoch,
};
//Init bootstrap process
let bootstrap_data = crate::bootstrap::BootstrapData {
done: false,
sleep_time: 1,
rpc_client,
};
(current_schedule_epoch, bootstrap_data)
current_schedule_epoch
}
pub fn bootstrap_leader_schedule(
current_file_patch: &str,
next_file_patch: &str,
/*
Bootstrap state changes
InitBootstrap
|
|Fetch accounts|
| |
Error BootstrapAccountsFetched(account list)
| |
|Exit| |Extract stores|
| |
Error StoreExtracted(account list, stores)
| |
| Wait(1s)| |Merge accounts in store|
| | |
BootstrapAccountsFetched(account list) Error AccountsMerged(stores)
| |
|Log and skip| |Merges store|
|Account | | |
Error End
|
|never occurs restart|
|
InitBootstrap
*/
pub fn run_bootstrap_events(
event: BootstrapEvent,
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<BootstrapEvent>>,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> anyhow::Result<Option<bool>> {
let result = process_bootstrap_event(event, stakestore, votestore);
match result {
BootsrapProcessResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh);
Ok(None)
}
BootsrapProcessResult::Event(event) => {
run_bootstrap_events(event, bootstrap_tasks, stakestore, votestore)
}
BootsrapProcessResult::End => Ok(Some(true)),
BootsrapProcessResult::Error(err) => bail!(err),
}
}
pub enum BootstrapEvent {
InitBootstrap {
sleep_time: u64,
rpc_url: String,
},
BootstrapAccountsFetched(
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Account,
String,
),
StoreExtracted(
StakeMap,
VoteMap,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Account,
String,
),
AccountsMerged(StakeMap, Option<StakeHistory>, VoteMap, String),
Exit,
}
enum BootsrapProcessResult {
TaskHandle(JoinHandle<BootstrapEvent>),
Event(BootstrapEvent),
Error(String),
End,
}
fn process_bootstrap_event(
event: BootstrapEvent,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> BootsrapProcessResult {
match event {
BootstrapEvent::InitBootstrap {
sleep_time,
rpc_url,
} => {
let jh = tokio::task::spawn_blocking(move || {
log::info!("BootstrapEvent::InitBootstrap RECV");
if sleep_time > 0 {
std::thread::sleep(Duration::from_secs(sleep_time));
}
match crate::bootstrap::bootstrap_accounts(rpc_url.clone()) {
Ok((stakes, votes, history)) => {
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url)
}
Err(err) => {
log::warn!(
"Bootstrap account error during fetching accounts err:{err}. Exit"
);
BootstrapEvent::Exit
}
}
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url) => {
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
match (
StakeStore::take_stakestore(stakestore),
VoteStore::take_votestore(votestore),
) {
(Ok((stake_map, _)), Ok(vote_map)) => {
BootsrapProcessResult::Event(BootstrapEvent::StoreExtracted(
stake_map, vote_map, stakes, votes, history, rpc_url,
))
}
_ => {
let jh = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url)
});
BootsrapProcessResult::TaskHandle(jh)
}
}
}
BootstrapEvent::StoreExtracted(
mut stake_map,
mut vote_map,
stakes,
votes,
history,
rpc_url,
) => {
log::info!("BootstrapEvent::StoreExtracted RECV");
let stake_history = crate::account::read_historystake_from_account(history);
if stake_history.is_none() {
return BootsrapProcessResult::Error(
"Bootstrap error, can't read stake history from account data.".to_string(),
);
}
//merge new PA with stake map and vote map in a specific task
let jh = tokio::task::spawn_blocking({
move || {
//update pa_list to set slot update to start epoq one.
crate::stake::merge_program_account_in_strake_map(
&mut stake_map,
stakes,
0, //with RPC no way to know the slot of the account update. Set to 0.
);
crate::vote::merge_program_account_in_vote_map(
&mut vote_map,
votes,
0, //with RPC no way to know the slot of the account update. Set to 0.
);
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map, rpc_url)
}
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map, rpc_url) => {
log::info!("BootstrapEvent::AccountsMerged RECV");
match (
StakeStore::merge_stakestore(stakestore, stake_map, stake_history),
VoteStore::merge_votestore(votestore, vote_map),
) {
(Ok(()), Ok(())) => BootsrapProcessResult::End,
_ => {
//TODO remove this error using type state
log::warn!("BootstrapEvent::AccountsMerged merge stake or vote fail, non extracted stake/vote map err, restart bootstrap");
BootsrapProcessResult::Event(BootstrapEvent::InitBootstrap {
sleep_time: 10,
rpc_url,
})
}
}
}
BootstrapEvent::Exit => panic!("Bootstrap account can't be done exit"),
}
}
fn bootstrap_accounts(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>, Account), ClientError> {
get_stake_account(rpc_url)
.and_then(|(stakes, rpc_url)| {
get_vote_account(rpc_url).map(|(votes, rpc_url)| (stakes, votes, rpc_url))
})
.and_then(|(stakes, votes, rpc_url)| {
get_stakehistory_account(rpc_url).map(|history| (stakes, votes, history))
})
}
fn get_stake_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetStakeAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client.get_program_accounts(&solana_sdk::stake::program::id());
log::info!("TaskToExec RpcGetStakeAccount END");
res_stake.map(|stake| (stake, rpc_url))
}
fn get_vote_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetVoteAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_vote = rpc_client.get_program_accounts(&solana_sdk::vote::program::id());
log::info!("TaskToExec RpcGetVoteAccount END");
res_vote.map(|votes| (votes, rpc_url))
}
pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
log::info!("TaskToExec RpcGetStakeHistory start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client.get_account(&solana_sdk::sysvar::stake_history::id());
log::info!("TaskToExec RpcGetStakeHistory END",);
res_stake
}
pub fn bootstrap_current_leader_schedule(
slots_in_epoch: u64,
) -> anyhow::Result<CalculatedSchedule> {
todo!();
}
let (current_epoch, current_stakes) =
crate::utils::read_schedule_vote_stakes(CURRENT_EPOCH_VOTE_STAKES_FILE)?;
let (next_epoch, next_stakes) =
crate::utils::read_schedule_vote_stakes(NEXT_EPOCH_VOTE_STAKES_FILE)?;
pub struct BootstrapData {
pub done: bool,
pub sleep_time: u64,
pub rpc_client: Arc<RpcClient>,
//calcualte leader schedule for all vote stakes.
let current_schedule = crate::leader_schedule::calculate_leader_schedule(
&current_stakes,
current_epoch,
slots_in_epoch,
);
let next_schedule =
crate::leader_schedule::calculate_leader_schedule(&next_stakes, next_epoch, slots_in_epoch);
Ok(CalculatedSchedule {
current: Some(LeaderScheduleData {
schedule: current_schedule,
//TODO use epoch stake for get_vote_accounts
epoch: current_epoch,
}),
next: Some(LeaderScheduleData {
schedule: next_schedule,
//TODO use epoch stake for get_vote_accounts
// vote_stakes: next_stakes.stake_vote_map,
epoch: next_epoch,
}),
})
}

View File

@ -4,23 +4,13 @@ use crate::vote::{VoteMap, VoteStore};
use futures::stream::FuturesUnordered;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use solana_client::rpc_client::RpcClient;
use solana_ledger::leader_schedule::LeaderSchedule;
use solana_program::sysvar::epoch_schedule::EpochSchedule;
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::feature_set::FeatureSet;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::StakeActivationStatus;
use solana_sdk::stake_history::StakeHistory;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::task::JoinHandle;
#[derive(Debug)]
@ -125,6 +115,32 @@ fn process_leadershedule_event(
next_epoch,
slots_in_epoch,
);
if std::path::Path::new(crate::bootstrap::NEXT_EPOCH_VOTE_STAKES_FILE)
.exists()
{
if let Err(err) = std::fs::rename(
crate::bootstrap::NEXT_EPOCH_VOTE_STAKES_FILE,
crate::bootstrap::CURRENT_EPOCH_VOTE_STAKES_FILE,
) {
log::error!(
"Fail to rename current leader schedule on disk because :{err}"
);
}
}
//save new vote stake in a file for bootstrap.
if let Err(err) = crate::utils::save_schedule_vote_stakes(
crate::bootstrap::NEXT_EPOCH_VOTE_STAKES_FILE,
&epoch_vote_stakes,
next_epoch,
) {
log::error!(
"Error during saving the new leader schedule of epoch:{} in a file error:{err}",
next_epoch
);
}
log::info!("End calculate leader schedule");
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
@ -187,6 +203,7 @@ fn calculate_epoch_stakes(
mut stake_history: Option<&mut StakeHistory>,
new_rate_activation_epoch: Option<solana_sdk::clock::Epoch>,
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
//code taken from Solana code: runtime::stakes::activate_epoch function
//update stake history with current end epoch stake values.
let stake_history_entry =
stake_map
@ -240,7 +257,7 @@ fn calculate_epoch_stakes(
//Copied from leader_schedule_utils.rs
// Mostly cribbed from leader_schedule_utils
fn calculate_leader_schedule(
pub fn calculate_leader_schedule(
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64,
slots_in_epoch: u64,

View File

@ -1,4 +1,5 @@
use crate::account::AccountPretty;
use crate::bootstrap::BootstrapEvent;
use crate::leader_schedule::LeaderScheduleGeneratedData;
use futures::Stream;
use futures_util::stream::FuturesUnordered;
@ -36,7 +37,9 @@ pub async fn start_stakes_and_votes_loop(
rpc_client: Arc<RpcClient>,
grpc_url: String,
) -> anyhow::Result<tokio::task::JoinHandle<()>> {
log::info!("Start Stake and Vote loop.");
let mut account_gyzer_stream = subscribe_geyzer(grpc_url).await?;
log::info!("Stake and Vote geyzer subscription done.");
let jh = tokio::spawn(async move {
//Stake account management struct
let mut stakestore = stake::StakeStore::new(STAKESTORE_INITIAL_CAPACITY);
@ -45,19 +48,41 @@ pub async fn start_stakes_and_votes_loop(
let mut votestore = vote::VoteStore::new(VOTESTORE_INITIAL_CAPACITY);
//Init bootstrap process
let (mut current_schedule_epoch, bootstrap_data) =
crate::bootstrap::bootstrap_process_data(&data_cache, rpc_client.clone()).await;
let mut current_schedule_epoch =
crate::bootstrap::bootstrap_scheduleepoch_data(&data_cache).await;
match crate::bootstrap::bootstrap_current_leader_schedule(
current_schedule_epoch.slots_in_epoch,
) {
Ok(current_schedule_data) => {
data_cache.leader_schedule = Arc::new(current_schedule_data)
}
Err(err) => {
log::warn!("Error during current leader schedule bootstrap from files:{err}")
}
}
//future execution collection.
let mut spawned_leader_schedule_task = FuturesUnordered::new();
let mut spawned_bootstrap_task = FuturesUnordered::new();
let jh = tokio::spawn(async move {
BootstrapEvent::InitBootstrap {
sleep_time: 1,
rpc_url: rpc_client.url(),
}
});
spawned_bootstrap_task.push(jh);
let mut bootstrap_done = false;
loop {
tokio::select! {
//manage confirm new slot notification to detect epoch change.
Ok(_) = slot_notification.recv() => {
//log::info!("Stake and Vote receive a slot.");
let new_slot = crate::utils::get_current_confirmed_slot(&data_cache).await;
let schedule_event = current_schedule_epoch.process_new_confirmed_slot(new_slot, &data_cache).await;
if bootstrap_data.done {
if bootstrap_done {
if let Some(init_event) = schedule_event {
crate::leader_schedule::run_leader_schedule_events(
init_event,
@ -79,11 +104,16 @@ pub async fn start_stakes_and_votes_loop(
Ok(msg) => {
match msg.update_oneof {
Some(UpdateOneof::Account(account)) => {
// log::info!("Stake and Vote geyzer receive an account:{}.",
// account.account.clone().map(|a|
// solana_sdk::pubkey::Pubkey::try_from(a.pubkey).map(|k| k.to_string())
// .unwrap_or("bad pubkey".to_string()).to_string())
// .unwrap_or("no content".to_string())
// );
//store new account stake.
let current_slot = crate::utils::get_current_confirmed_slot(&data_cache).await;
if let Some(account) = AccountPretty::new_from_geyzer(account, current_slot) {
//log::trace!("Geyser receive new account");
match account.owner {
solana_sdk::stake::program::ID => {
log::info!("Geyser notif stake account:{}", account);
@ -96,7 +126,7 @@ pub async fn start_stakes_and_votes_loop(
}
}
solana_sdk::vote::program::ID => {
// Generatea lot of logs. log::info!("Geyser notif VOTE account:{}", account);
//log::info!("Geyser notif VOTE account:{}", account);
let account_pubkey = account.pubkey;
//process vote accout notification
if let Err(err) = votestore.add_vote(account, current_schedule_epoch.last_slot_in_epoch) {
@ -109,6 +139,9 @@ pub async fn start_stakes_and_votes_loop(
}
}
Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"),
Some(UpdateOneof::Slot(slot)) => {
log::trace!("Receive slot slot: {slot:?}");
}
bad_msg => {
log::info!("Geyser stream unexpected message received:{:?}", bad_msg);
}
@ -128,6 +161,14 @@ pub async fn start_stakes_and_votes_loop(
}
}
}
//manage bootstrap event
Some(Ok(event)) = spawned_bootstrap_task.next() => {
match crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore) {
Ok(Some(boot_res))=> bootstrap_done = boot_res,
Ok(None) => (),
Err(err) => log::error!("Stake / Vote Account bootstrap fail because '{err}'"),
}
}
//Manage leader schedule generation process
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
let new_leader_schedule = crate::leader_schedule::run_leader_schedule_events(
@ -190,7 +231,8 @@ async fn subscribe_geyzer(
let confirmed_stream = client
.subscribe_once(
slots.clone(),
Default::default(), //slots
//slots.clone(),
accounts.clone(), //accounts
Default::default(), //tx
Default::default(), //entry

View File

@ -65,9 +65,9 @@ impl StakeStore {
}
}
pub fn get_stake_history(&self) -> Option<StakeHistory> {
self.stakes.content.1.clone()
}
// pub fn get_stake_history(&self) -> Option<StakeHistory> {
// self.stakes.content.1.clone()
// }
pub fn notify_stake_change(
&mut self,

View File

@ -1,4 +1,6 @@
use crate::vote::StoredVote;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::stores::block_information_store::BlockInformation;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::epoch::Epoch as LiteRpcEpoch;
@ -6,6 +8,10 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::default::Default;
use std::fs::File;
use std::io::Write;
use std::str::FromStr;
use std::sync::Arc;
pub async fn get_current_confirmed_slot(data_cache: &DataCache) -> u64 {
let commitment = CommitmentConfig::confirmed();
@ -21,6 +27,51 @@ pub async fn get_current_epoch(data_cache: &DataCache) -> LiteRpcEpoch {
data_cache.get_current_epoch(commitment).await
}
//Read save epoch vote stake to bootstrap current leader shedule and get_vote_account.
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct StringSavedStake {
epoch: u64,
stake_vote_map: HashMap<String, (u64, Arc<StoredVote>)>,
}
pub fn read_schedule_vote_stakes(
file_path: &str,
) -> anyhow::Result<(u64, HashMap<Pubkey, (u64, Arc<StoredVote>)>)> {
let content = std::fs::read_to_string(file_path)?;
let stakes_str: StringSavedStake = serde_json::from_str(&content)?;
//convert to EpochStake because json hashmap parser can only have String key.
let ret_stakes = stakes_str
.stake_vote_map
.into_iter()
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), (st.0, st.1)))
.collect();
Ok((stakes_str.epoch, ret_stakes))
}
pub fn save_schedule_vote_stakes(
base_file_path: &str,
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64,
) -> anyhow::Result<()> {
//save new schedule for restart.
//need to convert hahsmap key to String because json aloow only string
//key for dictionnary.
//it's better to use json because the file can use to very some stake by hand.
//in the end it will be removed with the bootstrap process.
let save_stakes = StringSavedStake {
epoch,
stake_vote_map: stake_vote_map
.iter()
.map(|(pk, st)| (pk.to_string(), (st.0, Arc::clone(&st.1))))
.collect(),
};
let serialized_stakes = serde_json::to_string(&save_stakes).unwrap();
let mut file = File::create(base_file_path).unwrap();
file.write_all(serialized_stakes.as_bytes()).unwrap();
file.flush().unwrap();
Ok(())
}
//Takable struct code
pub trait TakableContent<T>: Default {
fn add_value(&mut self, val: T);

View File

@ -74,10 +74,6 @@ impl VoteStore {
crate::utils::merge(&mut votestore.votes, vote_map)
}
fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) {
Self::vote_map_insert_vote(&mut self.votes.content, vote_account, vote_data);
}
fn remove_from_store(&mut self, account_pk: &Pubkey, update_slot: Slot) {
if self
.votes

View File

@ -78,3 +78,13 @@ test('get epoch info', async () => {
});
test('get leader schedule', async () => {
{
const leaderSchedule = await connection.getLeaderSchedule();
expect(Object.keys(leaderSchedule).length > 0);
}
});