Merge pull request #266 from blockworks-foundation/bootstrap_leader_schedule_from_rpc

Bootstrap leader schedule from rpc
This commit is contained in:
galactus 2024-01-08 16:14:35 +01:00 committed by GitHub
commit b49e51da9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 222 additions and 54 deletions

View File

@ -44,15 +44,26 @@ impl LeaderFetcherInterface for GrpcLeaderGetter {
.current
.as_ref()
.map(|e| e.epoch)
.unwrap_or(to_epoch);
.unwrap_or(to_epoch)
+ 1;
if from > to {
bail!("invalid arguments for get_slot_leaders");
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} from:{from} > to:{to}"
);
}
if from_epoch < current_epoch || from_epoch > next_epoch {
bail!("invalid arguments for get_slot_leaders");
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} \
from_epoch:{from_epoch} < current_epoch:{current_epoch} \
|| from_epoch > next_epoch:{next_epoch}"
);
}
if to_epoch < current_epoch || to_epoch > next_epoch {
bail!("invalid arguments for get_slot_leaders");
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} \
to_epoch:{to_epoch} < current_epoch:{current_epoch} \
|| to_epoch:{to_epoch} > next_epoch:{next_epoch}"
);
}
let limit = to - from;

View File

@ -60,7 +60,9 @@ impl EpochCache {
self.epoch_schedule.get_last_slot_in_epoch(epoch)
}
pub async fn bootstrap_epoch(rpc_client: &RpcClient) -> anyhow::Result<EpochCache> {
pub async fn bootstrap_epoch(
rpc_client: &RpcClient,
) -> anyhow::Result<(EpochCache, EpochInfo)> {
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;
@ -72,9 +74,14 @@ impl EpochCache {
bail!("Error during bootstrap epoch. SysvarAccountType::EpochSchedule can't be deserilized. Epoch can't be calculated.");
};
Ok(EpochCache {
epoch_schedule: Arc::new(epoch_schedule),
})
let epoch_info = rpc_client.get_epoch_info().await?;
Ok((
EpochCache {
epoch_schedule: Arc::new(epoch_schedule),
},
epoch_info,
))
}
}

View File

@ -154,8 +154,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);
let epoch_data = EpochCache::bootstrap_epoch(&rpc_client).await?;
let slots_per_epoch = epoch_data.get_epoch_schedule().slots_per_epoch;
let (epoch_data, current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;
let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
@ -212,17 +211,15 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let (leader_schedule, rpc_stakes_send): (Arc<dyn LeaderFetcherInterface>, Option<_>) =
if use_grpc && calculate_leader_schedule_form_geyser {
//init leader schedule grpc process.
//1) get stored schedule and stakes
if let Some((leader_schedule, vote_stakes)) =
solana_lite_rpc_stakevote::bootstrap_leaderschedule_from_files(slots_per_epoch)
{
data_cache
.identity_stakes
.update_stakes_for_identity(vote_stakes)
.await;
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = leader_schedule;
}
//1) get stored leader schedule and stakes (or via RPC if not present)
solana_lite_rpc_stakevote::bootstrat_literpc_leader_schedule(
rpc_client.url(),
&data_cache,
current_epoch_info.epoch,
)
.await;
//2) start stake vote and leader schedule.
let (rpc_stakes_send, rpc_stakes_recv) = mpsc::channel(1000);
let stake_vote_jh = solana_lite_rpc_stakevote::start_stakes_and_votes_loop(

View File

@ -11,14 +11,19 @@ use anyhow::bail;
use futures::future::join_all;
use futures_util::stream::FuturesUnordered;
use solana_client::client_error::ClientError;
use solana_client::client_error::ClientErrorKind;
use solana_client::rpc_client::RpcClient;
use solana_client::rpc_response::RpcVoteAccountStatus;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData;
use solana_program::slot_history::Slot;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::collections::HashMap;
use std::time::Duration;
use tokio::task::JoinHandle;
@ -47,9 +52,10 @@ pub async fn bootstrap_schedule_epoch_data(data_cache: &DataCache) -> ScheduleEp
// Return the current and next epoxh leader schedule and the current epoch stakes of vote accounts
// if the corresponding files exist.
pub fn bootstrap_leaderschedule_from_files(
current_epoch_of_loading: u64,
slots_in_epoch: u64,
) -> Option<(CalculatedSchedule, RpcVoteAccountStatus)> {
bootstrap_current_leader_schedule(slots_in_epoch)
bootstrap_current_leader_schedule(slots_in_epoch, current_epoch_of_loading)
.map(|(leader_schedule, current_epoch_stakes, _)| {
let vote_acccounts = crate::vote::get_rpc_vote_account_info_from_current_epoch_stakes(
&current_epoch_stakes,
@ -59,6 +65,63 @@ pub fn bootstrap_leaderschedule_from_files(
.ok()
}
// Return the current or next epoch leader schedule using the RPC calls.
pub fn bootstrap_leaderschedule_from_rpc(
rpc_url: String,
epoch_schedule: &EpochSchedule,
) -> Result<CalculatedSchedule, ClientError> {
let current_epoch = get_rpc_epoch_info(rpc_url.clone())?;
let current_schedule_by_node =
get_rpc_leader_schedule(rpc_url.clone(), None)?.ok_or(ClientError {
request: None,
kind: ClientErrorKind::Custom("RPC return no leader schedule".to_string()),
})?;
//Calculate the slot leaders by from the node schedule because RPC call get_slot_leaders is limited to 5000 slots.
let current_schedule_by_slot =
crate::leader_schedule::calculate_slot_leaders_from_schedule(&current_schedule_by_node)
.map_err(|err| ClientError {
request: None,
kind: ClientErrorKind::Custom(format!(
"Leader schedule from RPC can't generate slot leaders because:{err}"
)),
})?;
//get next epoch rpc schedule
let next_epoch = current_epoch.epoch + 1;
let next_first_epoch_slot = epoch_schedule.get_first_slot_in_epoch(next_epoch);
let next_schedule_by_node =
get_rpc_leader_schedule(rpc_url.clone(), Some(next_first_epoch_slot))?.ok_or(
ClientError {
request: None,
kind: ClientErrorKind::Custom("RPC return no leader schedule".to_string()),
},
)?;
//Calculate the slot leaders by from the node schedule because RPC call get_slot_leaders is limited to 5000 slots.
let next_schedule_by_slot =
crate::leader_schedule::calculate_slot_leaders_from_schedule(&next_schedule_by_node)
.map_err(|err| ClientError {
request: None,
kind: ClientErrorKind::Custom(format!(
"Leader schedule from RPC can't generate slot leaders because:{err}"
)),
})?;
Ok(CalculatedSchedule {
current: Some(LeaderScheduleData {
schedule_by_node: current_schedule_by_node.clone(),
schedule_by_slot: current_schedule_by_slot.clone(),
epoch: current_epoch.epoch,
}),
next: Some(LeaderScheduleData {
schedule_by_node: next_schedule_by_node,
schedule_by_slot: next_schedule_by_slot,
epoch: current_epoch.epoch + 1,
}),
})
}
/*
Bootstrap state changes
@ -92,8 +155,15 @@ pub fn run_bootstrap_events(
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
slots_in_epoch: u64,
current_epoch_of_loading: u64,
) -> anyhow::Result<Option<anyhow::Result<(CalculatedSchedule, RpcVoteAccountStatus)>>> {
let result = process_bootstrap_event(event, stakestore, votestore, slots_in_epoch);
let result = process_bootstrap_event(
event,
stakestore,
votestore,
slots_in_epoch,
current_epoch_of_loading,
);
match result {
BootsrapProcessResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh);
@ -105,6 +175,7 @@ pub fn run_bootstrap_events(
stakestore,
votestore,
slots_in_epoch,
current_epoch_of_loading,
),
BootsrapProcessResult::End(leader_schedule_result) => Ok(Some(leader_schedule_result)),
BootsrapProcessResult::Error(err) => bail!(err),
@ -154,6 +225,7 @@ fn process_bootstrap_event(
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
slots_in_epoch: u64,
current_epoch_of_loading: u64,
) -> BootsrapProcessResult {
match event {
BootstrapEvent::InitBootstrap {
@ -161,7 +233,6 @@ fn process_bootstrap_event(
rpc_url,
} => {
let jh = tokio::task::spawn_blocking(move || {
log::info!("BootstrapEvent::InitBootstrap RECV");
if sleep_time > 0 {
std::thread::sleep(Duration::from_secs(sleep_time));
}
@ -180,7 +251,6 @@ fn process_bootstrap_event(
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url) => {
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
match (&mut stakestore.stakes, &mut votestore.votes).take() {
TakeResult::Map((stake_map, (vote_map, epoch_cache))) => {
BootsrapProcessResult::Event(BootstrapEvent::StoreExtracted(
@ -220,8 +290,6 @@ fn process_bootstrap_event(
history,
rpc_url,
) => {
log::info!("BootstrapEvent::StoreExtracted RECV");
let stake_history = crate::account::read_historystake_from_account(&history.data);
if stake_history.is_none() {
return BootsrapProcessResult::Error(
@ -244,7 +312,10 @@ fn process_bootstrap_event(
0, //with RPC no way to know the slot of the account update. Set to 0.
);
match bootstrap_current_leader_schedule(slots_in_epoch) {
match bootstrap_current_leader_schedule(
current_epoch_of_loading,
slots_in_epoch,
) {
Ok((leader_schedule, current_epoch_stakes, next_epoch_stakes)) => {
let vote_acccounts =
crate::vote::get_rpc_vote_account_info_from_current_epoch_stakes(
@ -279,8 +350,6 @@ fn process_bootstrap_event(
rpc_url,
leader_schedule_result,
) => {
log::info!("BootstrapEvent::AccountsMerged RECV");
match (
stakestore.stakes.merge(stake_map),
votestore.votes.merge((vote_map, epoch_cache)),
@ -314,39 +383,55 @@ fn bootstrap_accounts(
}
fn get_stake_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetStakeAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client.get_program_accounts(&solana_sdk::stake::program::id());
log::info!("TaskToExec RpcGetStakeAccount END");
res_stake.map(|stake| (stake, rpc_url))
rpc_client
.get_program_accounts(&solana_sdk::stake::program::id())
.map(|stake| (stake, rpc_url))
}
fn get_vote_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetVoteAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_vote = rpc_client.get_program_accounts(&solana_sdk::vote::program::id());
log::info!("TaskToExec RpcGetVoteAccount END");
res_vote.map(|votes| (votes, rpc_url))
rpc_client
.get_program_accounts(&solana_sdk::vote::program::id())
.map(|votes| (votes, rpc_url))
}
pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
log::info!("TaskToExec RpcGetStakeHistory start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client.get_account(&solana_sdk::sysvar::stake_history::id());
log::info!("TaskToExec RpcGetStakeHistory END",);
res_stake
rpc_client.get_account(&solana_sdk::sysvar::stake_history::id())
}
fn get_rpc_epoch_info(rpc_url: String) -> Result<EpochInfo, ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client.get_epoch_info()
}
fn get_rpc_leader_schedule(
rpc_url: String,
slot: Option<Slot>,
) -> Result<Option<HashMap<String, Vec<usize>>>, ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client.get_leader_schedule(slot)
}
// pub struct BootstrapScheduleResult {
@ -355,6 +440,7 @@ pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError>
// }
pub fn bootstrap_current_leader_schedule(
current_epoch_of_loading: u64,
slots_in_epoch: u64,
) -> anyhow::Result<(CalculatedSchedule, EpochVoteStakes, EpochVoteStakes)> {
let (current_epoch, current_epoch_stakes) =
@ -362,6 +448,18 @@ pub fn bootstrap_current_leader_schedule(
let (next_epoch, next_epoch_stakes) =
crate::utils::read_schedule_vote_stakes(NEXT_EPOCH_VOTE_STAKES_FILE)?;
//verify that the current loaded epoch correspond to the current epoch slot
if current_epoch_of_loading != current_epoch {
return Err(ClientError {
request: None,
kind: ClientErrorKind::Custom(
"Current epoch bootstrap file doesn't correspond to the validator current epoch."
.to_string(),
),
}
.into());
}
//calcualte leader schedule for all vote stakes.
let current_schedule = crate::leader_schedule::calculate_leader_schedule(
&current_epoch_stakes,

View File

@ -56,7 +56,7 @@ impl ScheduleEpochData {
&mut self,
history: StakeHistory,
) -> Option<LeaderScheduleEvent> {
log::info!("set_epoch_stake_history");
log::debug!("set_epoch_stake_history");
self.new_stake_history = Some(history);
self.verify_epoch_change()
}
@ -68,7 +68,7 @@ impl ScheduleEpochData {
//to avoid to delay too much the schedule, start the calculus at the end of the epoch.
//the first epoch slot arrive very late cause of the stake account notification from the validator.
if self.current_confirmed_slot >= self.last_slot_in_epoch {
log::info!(
log::debug!(
"manage_change_epoch at slot:{} last_slot_in_epoch:{}",
self.current_confirmed_slot,
self.last_slot_in_epoch

View File

@ -13,7 +13,9 @@ use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData;
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake_history::StakeHistory;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::task::JoinHandle;
@ -117,7 +119,7 @@ fn process_leadershedule_event(
) => {
match (&mut stakestore.stakes, &mut votestore.votes).take() {
TakeResult::Map((stake_map, (vote_map, mut epoch_cache))) => {
log::info!("LeaderScheduleEvent::CalculateScedule");
log::info!("Start calculate leader schedule");
//do the calculus in a blocking task.
let jh = tokio::task::spawn_blocking({
move || {
@ -221,7 +223,6 @@ fn process_leadershedule_event(
(new_epoch, slots_in_epoch, epoch_schedule),
stake_history,
) => {
log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV");
match (
stakestore.stakes.merge(stake_map),
votestore.votes.merge((vote_map, epoch_cache)),
@ -304,10 +305,24 @@ pub fn calculate_leader_schedule(
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
sort_stakes(&mut stakes);
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
//log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS)
}
pub fn calculate_slot_leaders_from_schedule(
leader_scheudle: &HashMap<String, Vec<usize>>,
) -> Result<Vec<Pubkey>, String> {
let mut slot_leaders_map = BTreeMap::new();
for (pk, index_list) in leader_scheudle {
for index in index_list {
let pubkey = Pubkey::from_str(pk)
.map_err(|err| format!("Pubkey from leader schedule not a plublic key:{err}"))?;
slot_leaders_map.insert(index, pubkey);
}
}
Ok(slot_leaders_map.into_values().collect())
}
// Cribbed from leader_schedule_utils
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a

View File

@ -30,13 +30,55 @@ mod stake;
mod utils;
mod vote;
pub use bootstrap::bootstrap_leaderschedule_from_files;
// pub use bootstrap::{bootstrap_leaderschedule_from_files, bootstrap_leaderschedule_from_rpc};
const STAKESTORE_INITIAL_CAPACITY: usize = 600000;
const VOTESTORE_INITIAL_CAPACITY: usize = 600000;
type Slot = u64;
pub async fn bootstrat_literpc_leader_schedule(
rpc_url: String,
data_cache: &DataCache,
current_epoch_of_loading: u64,
) {
//init leader schedule grpc process.
//1) get stored schedule and stakes
let slots_per_epoch = data_cache.epoch_data.get_epoch_schedule().slots_per_epoch;
match crate::bootstrap::bootstrap_leaderschedule_from_files(
current_epoch_of_loading,
slots_per_epoch,
) {
Some((leader_schedule, vote_stakes)) => {
data_cache
.identity_stakes
.update_stakes_for_identity(vote_stakes)
.await;
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = leader_schedule;
}
None => {
log::info!("Leader schedule bootstrap file not found. Try to boot from rpc.");
match crate::bootstrap::bootstrap_leaderschedule_from_rpc(
rpc_url,
data_cache.epoch_data.get_epoch_schedule(),
) {
Ok(leader_schedule) => {
log::info!("Leader schedule bootstrap from rpc done.",);
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = leader_schedule;
}
Err(err) => {
log::warn!(
"An error occurs during bootstrap of the leader schedule using rpc:{err}"
);
log::warn!("No schedule has been loaded");
}
}
}
}
}
pub async fn start_stakes_and_votes_loop(
data_cache: DataCache,
mut slot_notification: SlotStream,
@ -143,7 +185,7 @@ pub async fn start_stakes_and_votes_loop(
if let Some(account) = account.account {
let acc_id = Pubkey::try_from(account.pubkey).expect("valid pubkey");
if acc_id == solana_sdk::sysvar::stake_history::ID {
log::info!("Geyser notifstake_history");
log::debug!("Geyser notifstake_history");
match crate::account::read_historystake_from_account(account.data.as_slice()) {
Some(stake_history) => {
let schedule_event = current_schedule_epoch.set_epoch_stake_history(stake_history);
@ -244,7 +286,7 @@ pub async fn start_stakes_and_votes_loop(
}
//manage bootstrap event
Some(Ok(event)) = spawned_bootstrap_task.next() => {
match crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, current_schedule_epoch.slots_in_epoch) {
match crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, current_schedule_epoch.slots_in_epoch, current_schedule_epoch.current_epoch) {
Ok(Some(boot_res))=> {
match boot_res {
Ok((current_schedule_data, vote_stakes)) => {

View File

@ -36,10 +36,8 @@ impl EpochVoteStakesCache {
}
pub fn add_stakes_for_epoch(&mut self, vote_stakes: EpochVoteStakes) {
log::info!("add_stakes_for_epoch :{}", vote_stakes.epoch);
if self.cache.insert(vote_stakes.epoch, vote_stakes).is_some() {
log::warn!("Override existing vote stake epoch cache for epoch:");
}
log::debug!("add_stakes_for_epoch :{}", vote_stakes.epoch);
self.cache.insert(vote_stakes.epoch, vote_stakes);
}
}