Compare commits

...

3 Commits

Author SHA1 Message Date
musitdev 1c6aa5fe8b change open port 2023-10-06 17:32:56 +02:00
musitdev f91965e1dd manage account removal from notification 2023-10-06 17:20:17 +02:00
musitdev db8a3079cb add stake history calculus and refactor to optimize 2023-10-06 16:25:21 +02:00
7 changed files with 610 additions and 544 deletions

View File

@ -1,12 +1,12 @@
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
use crate::Slot;
use futures_util::stream::FuturesUnordered;
use solana_client::client_error::ClientError;
use solana_client::rpc_client::RpcClient;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake_history::StakeHistory;
use std::time::Duration;
use tokio::task::JoinHandle;
/*
@ -64,7 +64,7 @@ pub enum BootstrapEvent {
Vec<(Pubkey, Account)>,
Account,
),
AccountsMerged(StakeMap, VoteMap),
AccountsMerged(StakeMap, Option<StakeHistory>, VoteMap),
Exit,
}
@ -76,8 +76,6 @@ enum BootsrapProcessResult {
pub struct BootstrapData {
pub done: bool,
pub current_epoch: u64,
pub next_epoch_start_slot: Slot,
pub sleep_time: u64,
pub rpc_url: String,
}
@ -116,7 +114,7 @@ fn process_bootstrap_event(
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history) => {
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => BootsrapProcessResult::Event(
(Ok((stake_map, _)), Ok(vote_map)) => BootsrapProcessResult::Event(
BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes, history),
),
_ => {
@ -131,27 +129,20 @@ fn process_bootstrap_event(
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes, history) => {
log::info!("BootstrapEvent::StoreExtracted RECV");
match crate::stakestore::read_historystake_from_account(history) {
Some(stake_history) => {
log::info!(
"Read stake history done with history len:{}",
stake_history.len()
);
stakestore.set_stake_history(stake_history);
}
None => log::error!("Bootstrap error, can't read stake history."),
let stake_history = crate::stakestore::read_historystake_from_account(history);
if stake_history.is_none() {
//TODO return error.
log::error!("Bootstrap error, can't read stake history.");
}
//merge new PA with stake map and vote map in a specific task
let jh = tokio::task::spawn_blocking({
let current_epoch = data.current_epoch;
move || {
//update pa_list to set slot update to start epoq one.
crate::stakestore::merge_program_account_in_strake_map(
&mut stake_map,
stakes,
0, //with RPC no way to know the slot of the account update. Set to 0.
current_epoch,
);
crate::votestore::merge_program_account_in_vote_map(
&mut vote_map,
@ -159,15 +150,15 @@ fn process_bootstrap_event(
0, //with RPC no way to know the slot of the account update. Set to 0.
);
BootstrapEvent::AccountsMerged(stake_map, vote_map)
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map)
}
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::AccountsMerged(stake_map, vote_map) => {
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map) => {
log::info!("BootstrapEvent::AccountsMerged RECV");
match (
merge_stakestore(stakestore, stake_map, data.current_epoch),
merge_stakestore(stakestore, stake_map, stake_history),
merge_votestore(votestore, vote_map),
) {
(Ok(()), Ok(())) => BootsrapProcessResult::End,

View File

@ -1,132 +1,150 @@
use crate::leader_schedule::LeaderScheduleEvent;
use crate::Slot;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use solana_account_decoder::parse_sysvar::SysvarAccountType;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::sync::Arc;
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
pub struct Epoch {
pub epoch: u64,
pub slot_index: u64,
pub slots_in_epoch: u64,
pub absolute_slot: Slot,
}
impl Epoch {
pub fn get_next_epoch(&self, current_epoch: &CurrentEpochSlotState) -> Epoch {
let last_slot = current_epoch.epoch_cache.get_last_slot_in_epoch(self.epoch);
current_epoch.epoch_cache.get_epoch_at_slot(last_slot + 1)
}
pub fn into_epoch_info(&self, block_height: u64, transaction_count: Option<u64>) -> EpochInfo {
EpochInfo {
epoch: self.epoch,
slot_index: self.slot_index,
slots_in_epoch: self.slots_in_epoch,
absolute_slot: self.absolute_slot,
block_height,
transaction_count,
}
}
}
pub fn get_epoch_for_slot(slot: Slot, current_epoch: &CurrentEpochSlotState) -> u64 {
let slots_in_epoch = current_epoch.current_epoch.slots_in_epoch;
let epoch_start_slot = current_epoch.current_epoch_start_slot();
if slot >= epoch_start_slot {
let slot_distance = (slot - epoch_start_slot) / slots_in_epoch;
current_epoch.current_epoch.epoch + slot_distance
} else {
let slot_distance = (epoch_start_slot - slot) / slots_in_epoch;
current_epoch.current_epoch.epoch - slot_distance
current_epoch.epoch_cache.get_epoch_at_slot(slot).epoch
}
#[derive(Clone, Debug)]
pub struct EpochCache {
epoch_schedule: Arc<EpochSchedule>,
}
impl EpochCache {
pub fn get_epoch_at_slot(&self, slot: Slot) -> Epoch {
let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot);
let slots_in_epoch = self.epoch_schedule.get_slots_in_epoch(epoch);
Epoch {
epoch,
slot_index,
slots_in_epoch,
absolute_slot: slot,
}
}
// pub fn get_epoch_shedule(&self) -> EpochSchedule {
// self.epoch_schedule.as_ref().clone()
// }
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
self.epoch_schedule.get_slots_in_epoch(epoch)
}
// pub fn get_first_slot_in_epoch(&self, epoch: u64) -> u64 {
// self.epoch_schedule.get_first_slot_in_epoch(epoch)
// }
pub fn get_last_slot_in_epoch(&self, epoch: u64) -> u64 {
self.epoch_schedule.get_last_slot_in_epoch(epoch)
}
pub async fn bootstrap_epoch(rpc_client: &RpcClient) -> anyhow::Result<EpochCache> {
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;
let Some(SysvarAccountType::EpochSchedule(epoch_schedule)) =
bincode::deserialize(&res_epoch.data[..])
.ok()
.map(SysvarAccountType::EpochSchedule)
else {
bail!("Error during bootstrap epoch. SysvarAccountType::EpochSchedule can't be deserilized. Epoch can't be calculated.");
};
Ok(EpochCache {
epoch_schedule: Arc::new(epoch_schedule),
})
}
}
#[derive(Debug)]
pub struct CurrentEpochSlotState {
pub current_slot: CurrentSlot,
pub current_epoch: EpochInfo,
pub next_epoch_start_slot: Slot,
pub first_epoch_slot: bool,
epoch_cache: EpochCache,
current_epoch_value: Epoch,
}
impl CurrentEpochSlotState {
pub async fn bootstrap(rpc_url: String) -> Result<CurrentEpochSlotState, ClientError> {
// pub fn get_epoch_shedule(&self) -> EpochSchedule {
// self.epoch_cache.get_epoch_shedule()
// }
pub fn get_current_epoch(&self) -> Epoch {
self.current_epoch_value
}
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
self.epoch_cache.get_slots_in_epoch(epoch)
}
pub async fn bootstrap(rpc_url: String) -> anyhow::Result<CurrentEpochSlotState> {
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized());
//get reduce_stake_warmup_cooldown feature info.
//NOT AND ACCOUNT. Get from config.
// let reduce_stake_warmup_cooldown_epoch = rpc_client
// .get_account(&feature_set::reduce_stake_warmup_cooldown::id())
// .await?;
let epoch_cache = EpochCache::bootstrap_epoch(&rpc_client).await?;
// let reduce_stake_warmup_cooldown_epoch = bincode::deserialize(&reduce_stake_warmup_cooldown_epoch.data[..])
// .ok()
// .map(SysvarAccountType::EpochSchedule);
// log::info!("reduce_stake_warmup_cooldown_epoch {reduce_stake_warmup_cooldown_epoch:?}");
//get epoch sysvar account to init epoch. More compatible with a snapshot bootstrap.
//sysvar_epoch_schedule Some(EpochSchedule(EpochSchedule { slots_per_epoch: 100, leader_schedule_slot_offset: 100, warmup: false, first_normal_epoch: 0, first_normal_slot: 0 }))
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;
let sysvar_epoch_schedule = bincode::deserialize(&res_epoch.data[..])
.ok()
.map(SysvarAccountType::EpochSchedule);
log::info!("sysvar_epoch_schedule {sysvar_epoch_schedule:?}");
// Fetch current epoch
let current_epoch = rpc_client.get_epoch_info().await?;
let next_epoch_start_slot =
current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
log::info!("Run_loop init {next_epoch_start_slot} current_epoch:{current_epoch:?}");
Ok(CurrentEpochSlotState {
current_slot: Default::default(),
current_epoch,
next_epoch_start_slot,
first_epoch_slot: false,
current_slot: CurrentSlot::default(),
epoch_cache,
current_epoch_value: Epoch::default(),
})
}
pub fn current_epoch_start_slot(&self) -> Slot {
self.current_epoch.absolute_slot - self.current_epoch.slot_index
}
// pub fn current_epoch_start_slot(&self) -> Slot {
// self.epoch_cache
// .get_first_slot_in_epoch(self.current_slot.confirmed_slot)
// }
pub fn current_epoch_end_slot(&self) -> Slot {
self.next_epoch_start_slot - 1
self.epoch_cache
.get_last_slot_in_epoch(self.current_epoch_value.epoch)
}
pub fn process_new_slot(
&mut self,
new_slot: &SubscribeUpdateSlot,
) -> Option<LeaderScheduleEvent> {
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
//for the first update of slot correct epoch info data.
if self.current_slot.confirmed_slot == 0 {
let diff = new_slot.slot - self.current_epoch.absolute_slot;
self.current_epoch.slot_index += diff;
self.current_epoch.absolute_slot = new_slot.slot;
log::trace!(
"Set current epoch with diff:{diff} slot:{} current:{:?}",
new_slot.slot,
self.current_epoch,
);
//if new slot update slot related state data.
} else if new_slot.slot > self.current_slot.confirmed_slot {
//update epoch info
let mut diff = new_slot.slot - self.current_slot.confirmed_slot;
//First epoch slot, index is 0 so remove 1 from diff.
if self.first_epoch_slot {
//calculate next epoch data
self.current_epoch.epoch += 1;
//slot can be non consecutif, use diff.
self.current_epoch.slot_index = 0;
self.current_epoch.absolute_slot = new_slot.slot;
log::info!(
"change_epoch calculated next epoch:{:?} at slot:{}",
self.current_epoch,
new_slot.slot,
);
self.first_epoch_slot = false;
//slot_index start at 0.
diff -= 1; //diff is always >= 1
self.next_epoch_start_slot =
self.next_epoch_start_slot + self.current_epoch.slots_in_epoch;
//set to next epochs.
} else {
self.current_epoch.absolute_slot = new_slot.slot;
}
self.current_epoch.slot_index += diff;
log::trace!(
"Slot epoch with slot:{} , diff:{diff} current epoch state:{self:?}",
new_slot.slot
);
}
}
//update slot state for all commitment.
self.current_slot.update_slot(&new_slot);
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
if self.current_epoch_value.epoch == 0 {
//init first epoch
self.current_epoch_value = self.epoch_cache.get_epoch_at_slot(new_slot.slot);
}
self.manage_change_epoch()
} else {
None
@ -134,22 +152,17 @@ impl CurrentEpochSlotState {
}
fn manage_change_epoch(&mut self) -> Option<LeaderScheduleEvent> {
//we change the epoch at the last slot of the current epoch.
if self.current_slot.confirmed_slot >= self.current_epoch_end_slot() {
log::info!(
"End epoch slot detected:{}",
self.current_slot.confirmed_slot
);
//set epoch change effectif at next slot.
self.first_epoch_slot = true;
if self.current_slot.confirmed_slot > self.current_epoch_end_slot() {
log::info!("Change epoch at slot:{}", self.current_slot.confirmed_slot);
self.current_epoch_value = self.get_current_epoch().get_next_epoch(&self);
//start leader schedule calculus
//switch to 2 next epoch to calculate schedule at next epoch.
//at current epoch change the schedule is calculated for the next epoch.
let schedule_epoch = crate::leader_schedule::next_schedule_epoch(&self.current_epoch);
let schedule_epoch = crate::leader_schedule::next_schedule_epoch(&schedule_epoch);
Some(LeaderScheduleEvent::InitLeaderschedule(schedule_epoch))
Some(crate::leader_schedule::create_schedule_init_event(
self.current_epoch_value.epoch,
self.get_slots_in_epoch(self.current_epoch_value.epoch),
Arc::clone(&self.epoch_cache.epoch_schedule),
))
} else {
None
}

View File

@ -1,15 +1,19 @@
use crate::epoch::CurrentEpochSlotState;
use crate::epoch::Epoch;
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
use crate::votestore::StoredVote;
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
use anyhow::bail;
use futures::stream::FuturesUnordered;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use solana_client::rpc_client::RpcClient;
use solana_ledger::leader_schedule::LeaderSchedule;
use solana_program::sysvar::epoch_schedule::EpochSchedule;
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::feature_set::FeatureSet;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::StakeActivationStatus;
use solana_sdk::stake_history::StakeHistory;
use std::collections::BTreeMap;
use std::collections::HashMap;
@ -34,75 +38,69 @@ pub struct CalculatedSchedule {
#[derive(Debug)]
pub struct LeaderScheduleData {
pub schedule: Arc<HashMap<String, Vec<usize>>>,
pub vote_stakes: Vec<(Pubkey, u64)>,
pub vote_stakes: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
pub epoch: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EpochStake {
epoch: u64,
stakes: Vec<(Pubkey, u64)>,
stake_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
}
impl From<SavedStake> for EpochStake {
fn from(saved_stakes: SavedStake) -> Self {
EpochStake {
epoch: saved_stakes.epoch,
stakes: saved_stakes
.stakes
stake_vote_map: saved_stakes
.stake_vote_map
.into_iter()
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), st))
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), (st.0, st.1)))
.collect(),
}
}
}
pub fn next_schedule_epoch(current_epoch: &EpochInfo) -> EpochInfo {
let mut next_epoch_info = current_epoch.clone();
next_epoch_info.epoch += 1;
next_epoch_info.slot_index = 0;
next_epoch_info.absolute_slot =
current_epoch.absolute_slot + current_epoch.slots_in_epoch - current_epoch.slot_index;
next_epoch_info
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct SavedStake {
epoch: u64,
stake_vote_map: HashMap<String, (u64, Arc<StoredVote>)>,
}
pub fn bootstrap_leader_schedule(
current_file_patch: &str,
next_file_patch: &str,
slots_in_epoch: u64,
epoch_state: &CurrentEpochSlotState,
) -> anyhow::Result<CalculatedSchedule> {
let mut current_stakes = read_schedule_vote_stakes(current_file_patch)?;
let mut next_stakes = read_schedule_vote_stakes(next_file_patch)?;
let current_stakes = read_schedule_vote_stakes(current_file_patch)?;
let next_stakes = read_schedule_vote_stakes(next_file_patch)?;
//calcualte leader schedule for all vote stakes.
let current_schedule = calculate_leader_schedule(
&mut current_stakes.stakes,
&current_stakes.stake_vote_map,
current_stakes.epoch,
slots_in_epoch,
)?;
let next_schedule =
calculate_leader_schedule(&mut next_stakes.stakes, next_stakes.epoch, slots_in_epoch)?;
epoch_state.get_slots_in_epoch(current_stakes.epoch),
);
let next_schedule = calculate_leader_schedule(
&next_stakes.stake_vote_map,
next_stakes.epoch,
epoch_state.get_slots_in_epoch(next_stakes.epoch),
);
Ok(CalculatedSchedule {
current: Some(LeaderScheduleData {
schedule: Arc::new(current_schedule),
vote_stakes: current_stakes.stakes,
epoch: current_stakes.epoch,
vote_stakes: current_stakes.stake_vote_map,
epoch: epoch_state.get_slots_in_epoch(current_stakes.epoch),
}),
next: Some(LeaderScheduleData {
schedule: Arc::new(next_schedule),
vote_stakes: next_stakes.stakes,
epoch: next_stakes.epoch,
vote_stakes: next_stakes.stake_vote_map,
epoch: epoch_state.get_slots_in_epoch(next_stakes.epoch),
}),
})
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct SavedStake {
epoch: u64,
stakes: Vec<(String, u64)>,
}
fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
let content = std::fs::read_to_string(file_path)?;
let stakes_str: SavedStake = serde_json::from_str(&content)?;
@ -112,16 +110,16 @@ fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
fn save_schedule_vote_stakes(
base_file_path: &str,
stakes: &Vec<(Pubkey, u64)>,
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64,
) -> anyhow::Result<()> {
//save new schedule for restart.
let filename = format!("{base_file_path}_{epoch}.json",);
let filename = format!("{base_file_path}_{}.json", epoch);
let save_stakes = SavedStake {
epoch,
stakes: stakes
stake_vote_map: stake_vote_map
.iter()
.map(|(pk, st)| (pk.to_string(), *st))
.map(|(pk, st)| (pk.to_string(), (st.0, Arc::clone(&st.1))))
.collect(),
};
let serialized_stakes = serde_json::to_string(&save_stakes).unwrap();
@ -157,11 +155,8 @@ pub fn run_leader_schedule_events(
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> Option<(
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
EpochInfo,
)> {
let result = process_leadershedule_event(rpc_url.clone(), event, stakestore, votestore);
) -> Option<LeaderScheduleData> {
let result = process_leadershedule_event(event, stakestore, votestore);
match result {
LeaderScheduleResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh);
@ -170,132 +165,155 @@ pub fn run_leader_schedule_events(
LeaderScheduleResult::Event(event) => {
run_leader_schedule_events(rpc_url, event, bootstrap_tasks, stakestore, votestore)
}
LeaderScheduleResult::End(schedule, epoch) => Some((schedule, epoch)),
LeaderScheduleResult::End(schedule) => Some(schedule),
}
}
// struct LeaderScheduleEpochData {
// current_epoch: u64,
// next_epoch: u64,
// slots_in_epoch: u64,
// epoch_schedule: Arc<EpochSchedule>,
// }
pub enum LeaderScheduleEvent {
InitLeaderschedule(EpochInfo),
CalculateScedule(StakeMap, VoteMap, EpochInfo, Option<StakeHistory>),
Init(u64, u64, Arc<EpochSchedule>),
MergeStoreAndSaveSchedule(
StakeMap,
VoteMap,
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
EpochInfo,
LeaderScheduleData,
(u64, u64, Arc<EpochSchedule>),
Option<StakeHistory>,
),
}
enum LeaderScheduleResult {
TaskHandle(JoinHandle<LeaderScheduleEvent>),
Event(LeaderScheduleEvent),
End(
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
EpochInfo,
),
End(LeaderScheduleData),
}
pub fn create_schedule_init_event(
current_epoch: u64,
slots_in_epoch: u64,
epoch_schedule: Arc<EpochSchedule>,
) -> LeaderScheduleEvent {
log::info!("LeaderScheduleEvent::InitLeaderschedule START");
//TODO get a way to be updated of stake history.
//request the current state using RPC.
//TODO remove the await from the scheduler task.
// let before_stake_histo = stake_history.clone();
// let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
// .map(|account| crate::stakestore::read_historystake_from_account(account))
// .unwrap_or_else(|_| {
// log::error!("Error during stake history fetch. Use bootstrap one");
// stake_history.map(|st| st.as_ref().clone())
// });
// log::info!(
// "stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
// epoch_data.current_epoch
// );
LeaderScheduleEvent::Init(current_epoch, slots_in_epoch, epoch_schedule)
}
//TODO remove desactivated account after leader schedule calculus.
fn process_leadershedule_event(
rpc_url: String,
// rpc_url: String,
event: LeaderScheduleEvent,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> LeaderScheduleResult {
match event {
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch) => {
log::info!("LeaderScheduleEvent::InitLeaderschedule RECV");
//For test TODO put in extract and restore process to avoid to clone.
let stake_history = stakestore.get_cloned_stake_history();
//TODO get a way to be updated of stake history.
//request the current state using RPC.
//TODO remove the await from the scheduler task.
let before_stake_histo = stake_history.clone();
let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
.map(|account| crate::stakestore::read_historystake_from_account(account))
.unwrap_or_else(|_| {
log::error!("Error during stake history fetch. Use bootstrap one");
stake_history
});
log::info!(
"stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
schedule_epoch.epoch
);
LeaderScheduleEvent::Init(current_epoch, slots_in_epoch, epoch_schedule) => {
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => {
LeaderScheduleResult::Event(LeaderScheduleEvent::CalculateScedule(
stake_map,
vote_map,
schedule_epoch,
stake_history,
))
}
_ => {
log::warn!("process_leadershedule_event error during extract store");
let jh = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch)
(Ok((stake_map, mut stake_history)), Ok(vote_map)) => {
//For test TODO put in extract and restore process to avoid to clone.
log::info!("LeaderScheduleEvent::CalculateScedule");
let epoch_schedule = Arc::clone(&epoch_schedule);
let jh = tokio::task::spawn_blocking({
move || {
let next_epoch = current_epoch + 1;
let epoch_vote_stakes = calculate_epoch_stakes(
&stake_map,
&vote_map,
current_epoch,
next_epoch,
stake_history.as_mut(),
&epoch_schedule,
);
let leader_schedule = calculate_leader_schedule(
&epoch_vote_stakes,
next_epoch,
slots_in_epoch,
);
// let epoch_vote_stakes_old = calculate_epoch_stakes_old_algo(
// &stake_map,
// &vote_map,
// &epoch_state.next_epoch,
// stake_history.as_ref(),
// );
// let leader_schedule_old =
// calculate_leader_schedule(&epoch_vote_stakes_old, &next_epoch);
if let Err(err) = save_schedule_vote_stakes(
SCHEDULE_STAKE_BASE_FILE_NAME,
&epoch_vote_stakes,
next_epoch,
) {
log::error!(
"Error during saving in file of the new schedule of epoch:{} error:{err}",
next_epoch
);
}
log::info!("End calculate leader schedule");
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
LeaderScheduleData {
schedule: Arc::new(leader_schedule),
vote_stakes: epoch_vote_stakes,
epoch: next_epoch,
},
(current_epoch, slots_in_epoch, epoch_schedule),
stake_history,
)
}
});
LeaderScheduleResult::TaskHandle(jh)
}
}
}
LeaderScheduleEvent::CalculateScedule(
stake_map,
vote_map,
schedule_epoch,
stake_history,
) => {
log::info!("LeaderScheduleEvent::CalculateScedule RECV");
let jh = tokio::task::spawn_blocking({
move || {
let schedule_and_stakes =
crate::leader_schedule::calculate_leader_schedule_from_stake_map(
&stake_map,
&vote_map,
&schedule_epoch,
stake_history.as_ref(),
);
if let Ok((_, vote_stakes)) = &schedule_and_stakes {
if let Err(err) = save_schedule_vote_stakes(
SCHEDULE_STAKE_BASE_FILE_NAME,
vote_stakes,
schedule_epoch.epoch,
) {
log::error!(
"Error during saving in file of the new schedule of epoch:{} error:{err}",
schedule_epoch.epoch
);
}
}
log::info!("End calculate leader schedule");
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
schedule_and_stakes,
schedule_epoch,
)
_ => {
log::error!("Create leadershedule init event error during extract store");
LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
current_epoch,
slots_in_epoch,
epoch_schedule,
))
}
});
LeaderScheduleResult::TaskHandle(jh)
}
}
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
schedule,
schedule_epoch,
schedule_data,
(current_epoch, slots_in_epoch, epoch_schedule),
stake_history,
) => {
log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV");
match (
merge_stakestore(stakestore, stake_map, schedule_epoch.epoch),
merge_stakestore(stakestore, stake_map, stake_history),
merge_votestore(votestore, vote_map),
) {
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule, schedule_epoch),
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule_data),
_ => {
//this shoud never arrive because the store has been extracted before.
//TODO remove this error using type state
log::warn!("LeaderScheduleEvent::MergeStoreAndSaveSchedule merge stake or vote fail, -restart Schedule");
LeaderScheduleResult::Event(LeaderScheduleEvent::InitLeaderschedule(
schedule_epoch,
LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
current_epoch,
slots_in_epoch,
epoch_schedule,
))
}
}
@ -303,65 +321,68 @@ fn process_leadershedule_event(
}
}
// fn calculate_epoch_stakes(stake_map: &StakeMap) {
// let stake_history_entry =
fn calculate_epoch_stakes(
stake_map: &StakeMap,
vote_map: &VoteMap,
current_epoch: u64,
next_epoch: u64,
mut stake_history: Option<&mut StakeHistory>,
epoch_schedule: &EpochSchedule,
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
//update stake history with current end epoch stake values.
let new_rate_activation_epoch =
FeatureSet::default().new_warmup_cooldown_rate_epoch(epoch_schedule);
let stake_history_entry =
stake_map
.values()
.fold(StakeActivationStatus::default(), |acc, stake_account| {
let delegation = stake_account.stake;
acc + delegation.stake_activating_and_deactivating(
current_epoch,
stake_history.as_deref(),
new_rate_activation_epoch,
)
});
match stake_history {
Some(ref mut stake_history) => stake_history.add(current_epoch, stake_history_entry),
None => log::warn!("Vote stake calculus without Stake History"),
};
// let stake_delegations: Vec<_> = self.stake_delegations.values().collect();
// // Wrap up the prev epoch by adding new stake history entry for the
// // prev epoch.
// let stake_history_entry = thread_pool.install(|| {
// stake_delegations
// .par_iter()
// .fold(StakeActivationStatus::default, |acc, stake_account| {
// let delegation = stake_account.delegation();
// acc + delegation.stake_activating_and_deactivating(
// self.epoch,
// Some(&self.stake_history),
// new_rate_activation_epoch,
// )
// })
// .reduce(StakeActivationStatus::default, Add::add)
// });
// self.stake_history.add(self.epoch, stake_history_entry);
// self.epoch = next_epoch;
// // Refresh the stake distribution of vote accounts for the next epoch,
// // using new stake history.
// let delegated_stakes = thread_pool.install(|| {
// stake_delegations
// .par_iter()
// .fold(HashMap::default, |mut delegated_stakes, stake_account| {
// let delegation = stake_account.delegation();
// let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
// *entry += delegation.stake(
// self.epoch,
// Some(&self.stake_history),
// new_rate_activation_epoch,
// );
// delegated_stakes
// })
// .reduce(HashMap::default, merge)
// });
// self.vote_accounts = self
// .vote_accounts
// .iter()
// .map(|(&vote_pubkey, vote_account)| {
// let delegated_stake = delegated_stakes
// .get(&vote_pubkey)
// .copied()
// .unwrap_or_default();
// (vote_pubkey, (delegated_stake, vote_account.clone()))
// })
// .collect();
//calculate schedule stakes.
let delegated_stakes: HashMap<Pubkey, u64> =
stake_map
.values()
.fold(HashMap::default(), |mut delegated_stakes, stake_account| {
let delegation = stake_account.stake;
let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
*entry += delegation.stake(
next_epoch,
stake_history.as_deref(),
new_rate_activation_epoch,
);
delegated_stakes
});
// }
let staked_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)> = vote_map
.values()
.map(|vote_account| {
let delegated_stake = delegated_stakes
.get(&vote_account.pubkey)
.copied()
.unwrap_or_default();
(vote_account.pubkey, (delegated_stake, vote_account.clone()))
})
.collect();
staked_vote_map
}
fn calculate_leader_schedule_from_stake_map(
stake_map: &crate::stakestore::StakeMap,
vote_map: &crate::votestore::VoteMap,
current_epoch_info: &EpochInfo,
fn calculate_epoch_stakes_old_algo(
stake_map: &StakeMap,
vote_map: &VoteMap,
next_epoch: &Epoch,
stake_history: Option<&StakeHistory>,
) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
let mut stakes = HashMap::<Pubkey, u64>::new();
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
let mut stakes = HashMap::<Pubkey, (u64, Arc<StoredVote>)>::new();
log::info!(
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
vote_map.len(),
@ -369,7 +390,7 @@ fn calculate_leader_schedule_from_stake_map(
stake_history.as_ref().map(|h| h.len())
);
let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch;
let ten_epoch_slot_long = 10 * next_epoch.slots_in_epoch;
//log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
for storestake in stake_map.values() {
@ -392,47 +413,149 @@ fn calculate_leader_schedule_from_stake_map(
"leader schedule vote:{} with None root_slot, add it",
vote_account.pubkey
);
current_epoch_info.absolute_slot
}) < current_epoch_info
.absolute_slot
.saturating_sub(ten_epoch_slot_long)
next_epoch.absolute_slot
}) < next_epoch.absolute_slot.saturating_sub(ten_epoch_slot_long)
{
log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
, storestake.stake.voter_pubkey
,vote_account.vote_data.node_pubkey
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
, storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)),
, storestake.stake.stake(next_epoch.epoch, stake_history, Some(0)),
);
} else {
let effective_stake = storestake
.stake
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
.stake(current_epoch_info.epoch, stake_history, Some(0));
//only vote account with positive stake are use for the schedule.
if effective_stake > 0 {
*(stakes
.entry(vote_account.vote_data.node_pubkey)
.or_insert(0)) += effective_stake;
} else {
log::info!(
"leader schedule vote:{} with 0 effective vote",
storestake.stake.voter_pubkey
);
}
.stake(next_epoch.epoch, stake_history, Some(0));
stakes
.entry(vote_account.pubkey)
.or_insert((0, vote_account.clone()))
.0 += effective_stake;
// //only vote account with positive stake are use for the schedule.
// if effective_stake > 0 {
// *(stakes
// .entry(vote_account.vote_data.node_pubkey)
// .or_insert(0)) += effective_stake;
// } else {
// log::info!(
// "leader schedule vote:{} with 0 effective vote",
// storestake.stake.voter_pubkey
// );
// }
}
}
let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
schedule_stakes.extend(stakes.drain());
let leader_schedule = calculate_leader_schedule(
&mut schedule_stakes,
current_epoch_info.epoch,
current_epoch_info.slots_in_epoch,
)?;
Ok((leader_schedule, schedule_stakes))
stakes
}
//Copied from leader_schedule_utils.rs
// Mostly cribbed from leader_schedule_utils
fn calculate_leader_schedule(
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64,
slots_in_epoch: u64,
) -> HashMap<String, Vec<usize>> {
let mut stakes: Vec<(Pubkey, u64)> = stake_vote_map
.iter()
.filter_map(|(pk, (stake, _))| (*stake > 0).then_some((*pk, *stake)))
.collect();
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
sort_stakes(&mut stakes);
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
let slot_schedule = schedule
.get_slot_leaders()
.iter()
.enumerate()
.map(|(i, pk)| (pk.to_string(), i))
.into_group_map()
.into_iter()
.collect();
slot_schedule
}
// fn calculate_leader_schedule_from_stake_map(
// stake_map: &crate::stakestore::StakeMap,
// vote_map: &crate::votestore::VoteMap,
// current_epoch_info: &EpochInfo,
// stake_history: Option<&StakeHistory>,
// ) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
// let mut stakes = HashMap::<Pubkey, u64>::new();
// log::info!(
// "calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
// vote_map.len(),
// stake_map.len(),
// stake_history.as_ref().map(|h| h.len())
// );
// let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch;
// //log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
// for storestake in stake_map.values() {
// //get nodeid for vote account
// let Some(vote_account) = vote_map.get(&storestake.stake.voter_pubkey) else {
// log::info!(
// "Vote account not found in vote map for stake vote account:{}",
// &storestake.stake.voter_pubkey
// );
// continue;
// };
// //TODO validate the number of epoch.
// //remove vote account that hasn't vote since 10 epoch.
// //on testnet the vote account CY7gjryUPV6Pwbsn4aArkMBL7HSaRHB8sPZUvhw558Tm node_id:6YpwLjgXcMWAj29govWQr87kaAGKS7CnoqWsEDJE4h8P
// //hasn't vote since a long time but still return on RPC call get_voteaccounts.
// //the validator don't use it for leader schedule.
// if vote_account.vote_data.root_slot.unwrap_or_else(|| {
// //vote with no root_slot are added. They have just been activated and can have active stake- TODO TO TEST.
// log::info!(
// "leader schedule vote:{} with None root_slot, add it",
// vote_account.pubkey
// );
// current_epoch_info.absolute_slot
// }) < current_epoch_info
// .absolute_slot
// .saturating_sub(ten_epoch_slot_long)
// {
// log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
// , storestake.stake.voter_pubkey
// ,vote_account.vote_data.node_pubkey
// //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
// , storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)),
// );
// } else {
// let effective_stake = storestake
// .stake
// //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
// .stake(current_epoch_info.epoch, stake_history, Some(0));
// //only vote account with positive stake are use for the schedule.
// if effective_stake > 0 {
// *(stakes
// .entry(vote_account.vote_data.node_pubkey)
// .or_insert(0)) += effective_stake;
// } else {
// log::info!(
// "leader schedule vote:{} with 0 effective vote",
// storestake.stake.voter_pubkey
// );
// }
// }
// }
// let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
// schedule_stakes.extend(stakes.drain());
// let leader_schedule = calculate_leader_schedule_old(
// &mut schedule_stakes,
// current_epoch_info.epoch,
// current_epoch_info.slots_in_epoch,
// )?;
// Ok((leader_schedule, schedule_stakes))
// }
// fn is_stake_to_add(
// stake_pubkey: Pubkey,
// stake: &Delegation,
@ -461,30 +584,30 @@ fn calculate_leader_schedule_from_stake_map(
//Copied from leader_schedule_utils.rs
// Mostly cribbed from leader_schedule_utils
fn calculate_leader_schedule(
stakes: &mut Vec<(Pubkey, u64)>,
epoch: u64,
slots_in_epoch: u64,
) -> anyhow::Result<HashMap<String, Vec<usize>>> {
if stakes.is_empty() {
bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
}
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
sort_stakes(stakes);
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}");
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
// fn calculate_leader_schedule_old(
// stakes: &mut Vec<(Pubkey, u64)>,
// epoch: u64,
// slots_in_epoch: u64,
// ) -> anyhow::Result<HashMap<String, Vec<usize>>> {
// if stakes.is_empty() {
// bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
// }
// let mut seed = [0u8; 32];
// seed[0..8].copy_from_slice(&epoch.to_le_bytes());
// sort_stakes(stakes);
// log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}");
// let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
let slot_schedule = schedule
.get_slot_leaders()
.iter()
.enumerate()
.map(|(i, pk)| (pk.to_string(), i))
.into_group_map()
.into_iter()
.collect();
Ok(slot_schedule)
}
// let slot_schedule = schedule
// .get_slot_leaders()
// .iter()
// .enumerate()
// .map(|(i, pk)| (pk.to_string(), i))
// .into_group_map()
// .into_iter()
// .collect();
// Ok(slot_schedule)
// }
// Cribbed from leader_schedule_utils
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
@ -603,7 +726,7 @@ use solana_sdk::stake::state::StakeState;
pub fn build_current_stakes(
stake_map: &crate::stakestore::StakeMap,
stake_history: Option<&StakeHistory>,
current_epoch_info: &EpochInfo,
current_epoch: u64,
rpc_url: String,
commitment: CommitmentConfig,
) -> BTreeMap<String, (u64, u64)> {
@ -628,10 +751,9 @@ pub fn build_current_stakes(
match BorshDeserialize::deserialize(&mut account.data.as_slice()).unwrap() {
StakeState::Stake(_, stake) => {
//vote account version
let effective_stake =
stake
.delegation
.stake(current_epoch_info.epoch, stake_history, Some(0));
let effective_stake = stake
.delegation
.stake(current_epoch, stake_history, Some(0));
if effective_stake > 0 {
// Add the stake in this stake account to the total for the delegated-to vote account
log::trace!("RPC Stake {pubkey} account:{account:?} stake:{stake:?}");
@ -657,10 +779,7 @@ pub fn build_current_stakes(
stake_map
.iter()
.filter_map(|(pubkey, stake)| {
let effective_stake =
stake
.stake
.stake(current_epoch_info.epoch, stake_history, Some(0));
let effective_stake = stake.stake.stake(current_epoch, stake_history, Some(0));
(effective_stake > 0).then(|| (pubkey, stake, effective_stake))
})
.for_each(|(pubkey, stake, effective_stake)| {

View File

@ -48,7 +48,6 @@ use solana_sdk::stake::state::Delegation;
use solana_sdk::vote::state::VoteState;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use tokio::time::Duration;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::CommitmentLevel;
@ -169,7 +168,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let mut leader_schedule_data = match crate::leader_schedule::bootstrap_leader_schedule(
&schedule_current_file,
&schedule_next_file,
current_epoch_state.current_epoch.slots_in_epoch,
&current_epoch_state,
) {
Ok(data) => data,
Err(err) => {
@ -246,8 +245,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//Init bootstrap process
let mut bootstrap_data = BootstrapData {
done: false,
current_epoch: current_epoch_state.current_epoch.epoch,
next_epoch_start_slot: current_epoch_state.next_epoch_start_slot,
sleep_time: 1,
rpc_url: RPC_URL.to_string(),
};
@ -261,7 +258,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//TODO remove. Store parent hash to see if we don't miss a block.
let mut parent_block_slot = None;
loop {
tokio::select! {
Some(req) = request_rx.recv() => {
@ -270,13 +266,13 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
tokio::task::spawn_blocking({
log::info!("RPC start save_stakes");
let current_stakes = stakestore.get_cloned_stake_map();
let history = stakestore.get_cloned_stake_history();
let move_epoch = current_epoch_state.current_epoch.clone();
let move_epoch = current_epoch_state.get_current_epoch();
let stake_history = stakestore.get_stake_history();
move || {
let current_stake = crate::leader_schedule::build_current_stakes(
&current_stakes,
history.as_ref(),
&move_epoch,
stake_history.as_ref(),
move_epoch.epoch,
RPC_URL.to_string(),
CommitmentConfig::confirmed(),
);
@ -332,45 +328,15 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}
//Manage RPC call result execution
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
if let Some((new_schedule, epoch)) = crate::leader_schedule::run_leader_schedule_events(
let new_leader_schedule = crate::leader_schedule::run_leader_schedule_events(
RPC_URL.to_string(),
event,
&mut spawned_leader_schedule_task,
&mut stakestore,
&mut votestore,
) {
leader_schedule_data.current = leader_schedule_data.next.take();
match new_schedule {
Ok((new_schedule, new_stakes)) => {
leader_schedule_data.next = Some(LeaderScheduleData {
schedule: Arc::new(new_schedule),
vote_stakes: new_stakes,
epoch: epoch.epoch,
});
},
Err(err) => {
log::error!("Error during new leader schedule generation:{err}");
log::error!("No leader schedule available for next epoch.");
}
}
//TODO remove verification when schedule ok.
//verify calculated shedule with the one the RPC return.
// if let Some(schedule) = new_schedule {
// tokio::task::spawn_blocking(|| {
// //10 second that the schedule has been calculated on the validator
// std::thread::sleep(std::time::Duration::from_secs(5));
// log::info!("Start Verify schedule");
// if let Err(err) = crate::leader_schedule::verify_schedule(schedule,RPC_URL.to_string()) {
// log::warn!("Error during schedule verification:{err}");
// }
// log::info!("End Verify schedule");
// });
// }
}
);
leader_schedule_data.current = leader_schedule_data.next.take();
leader_schedule_data.next = new_leader_schedule;
}
//get confirmed slot or account
@ -392,7 +358,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
if let Err(err) = stakestore.notify_stake_change(
account,
current_epoch_state.current_epoch_end_slot(),
current_epoch_state.current_epoch.epoch,
) {
log::warn!("Can't add new stake from account data err:{}", err);
continue;

View File

@ -22,7 +22,7 @@ use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3000";
const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3001";
const SERVER_ERROR_MSG: &str = "Internal server error";
//internal RPC access
@ -88,7 +88,10 @@ pub struct RPCServer {
#[jsonrpsee::core::async_trait]
impl ConsensusRpcServer for RPCServer {
async fn get_latest_blockhash(&self, config: Option<RpcContextConfig>) -> Result<RpcBlockhash> {
async fn get_latest_blockhash(
&self,
_config: Option<RpcContextConfig>,
) -> Result<RpcBlockhash> {
todo!()
}
@ -137,7 +140,11 @@ pub fn server_rpc_request(
) {
match request {
crate::rpc::Requests::EpochInfo(tx) => {
if let Err(err) = tx.send(current_epoch_state.current_epoch.clone()) {
if let Err(err) = tx.send(
current_epoch_state
.get_current_epoch()
.into_epoch_info(0, None),
) {
log::warn!("Channel error during sending back request status error:{err:?}");
}
}
@ -158,11 +165,11 @@ pub fn server_rpc_request(
log::info!(
"Requests::LeaderSchedule slot:{slot} epoch:{epoch} current epoch:{}",
current_epoch_state.current_epoch.epoch
current_epoch_state.get_current_epoch().epoch
);
//currently only return schedule for current of next epoch.
let get_schedule_fn = |schedule: &LeaderScheduleData| {
(schedule.epoch == epoch).then(|| schedule.schedule.clone()) //Arc clone.
(schedule.epoch == epoch).then(|| schedule.schedule.clone()) //clone the schedule. TODO clone arc.
};
let schedule = leader_schedules
.current
@ -285,7 +292,7 @@ pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHan
Err("Error stake store extracted".to_string())
} else {
//replace pubkey with String. Json only allow distionary key with string.
let ret: HashMap<String, StoredVote> = accounts
let ret: HashMap<String, Arc<StoredVote>> = accounts
.into_iter()
.map(|(pk, acc)| (pk.to_string(), acc))
.collect();

View File

@ -12,13 +12,14 @@ use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState;
use solana_sdk::stake_history::StakeHistory;
use std::collections::HashMap;
use std::str::FromStr;
use tokio::sync::mpsc::Sender;
use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction;
pub type StakeMap = HashMap<Pubkey, StoredStake>;
pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMap> {
pub fn extract_stakestore(
stakestore: &mut StakeStore,
) -> anyhow::Result<(StakeMap, Option<StakeHistory>)> {
let new_store = std::mem::take(stakestore);
let (new_store, stake_map) = new_store.extract()?;
*stakestore = new_store;
@ -28,21 +29,15 @@ pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMa
pub fn merge_stakestore(
stakestore: &mut StakeStore,
stake_map: StakeMap,
current_epoch: u64,
stake_history: Option<StakeHistory>,
) -> anyhow::Result<()> {
let new_store = std::mem::take(stakestore);
let new_store = new_store.merge_stakes(stake_map, current_epoch)?;
let new_store = new_store.merge_stakes(stake_map, stake_history)?;
*stakestore = new_store;
Ok(())
}
fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch: u64) {
//don't add stake that are already desactivated.
//there's some stran,ge stake that has activeate_epock = max epoch and desactivate ecpock 90 that are taken into account.
//Must be better defined.
// if stake.stake.deactivation_epoch < current_epoch {
// return;
// }
fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake) {
//log::info!("stake_map_notify_stake stake:{stake:?}");
match map.entry(stake.pubkey) {
// If value already exists, then increment it by one
@ -70,7 +65,6 @@ fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch:
pub enum ExtractedAction {
Notify {
stake: StoredStake,
current_epoch: u64,
},
Remove(Pubkey, Slot),
Merge {
@ -85,18 +79,12 @@ pub enum ExtractedAction {
impl ExtractedAction {
fn get_update_slot(&self) -> u64 {
match self {
ExtractedAction::Notify { stake, .. } => stake.last_update_slot,
ExtractedAction::Notify { stake } => stake.last_update_slot,
ExtractedAction::Remove(_, slot) => *slot,
ExtractedAction::Merge { update_slot, .. } => *update_slot,
ExtractedAction::None => 0,
}
}
fn update_epoch(&mut self, epoch: u64) {
if let ExtractedAction::Notify { current_epoch, .. } = self {
*current_epoch = epoch;
}
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
@ -108,16 +96,6 @@ pub struct StoredStake {
pub write_version: u64,
}
// impl StoredStake {
// fn is_removed(&self, current_epoch: u64) -> bool {
// self.stake.activation_epoch != crate::leader_schedule::MAX_EPOCH_VALUE
// && self.stake.deactivation_epoch < current_epoch
// }
// fn is_inserted(&self, current_epoch: u64) -> bool {
// self.stake.activation_epoch == crate::leader_schedule::MAX_EPOCH_VALUE
// || self.stake.deactivation_epoch >= current_epoch
// }
// }
#[derive(Debug, Default)]
pub struct StakeStore {
stakes: StakeMap,
@ -136,8 +114,12 @@ impl StakeStore {
}
}
pub fn set_stake_history(&mut self, stake_history: StakeHistory) {
self.stake_history = Some(stake_history);
// pub fn set_stake_history(&mut self, stake_history: StakeHistory) {
// self.stake_history = Some(Arc::new(stake_history));
// }
pub fn get_stake_history(&self) -> Option<StakeHistory> {
self.stake_history.clone()
}
pub fn nb_stake_account(&self) -> usize {
@ -147,48 +129,34 @@ impl StakeStore {
pub fn get_cloned_stake_map(&self) -> StakeMap {
self.stakes.clone()
}
pub fn get_cloned_stake_history(&self) -> Option<StakeHistory> {
self.stake_history.clone()
}
pub fn notify_stake_change(
&mut self,
new_account: AccountPretty,
account: AccountPretty,
current_end_epoch_slot: Slot,
current_epoch: u64,
) -> anyhow::Result<()> {
let Ok(delegated_stake_opt) = new_account.read_stake() else {
bail!("Can't read stake from account data");
};
if let Some(delegated_stake) = delegated_stake_opt {
let stake = StoredStake {
pubkey: new_account.pubkey,
lamports: new_account.lamports,
stake: delegated_stake,
last_update_slot: new_account.slot,
write_version: new_account.write_version,
};
//if lamport == 0 the account has been removed.
if account.lamports == 0 {
self.notify_stake_action(
ExtractedAction::Notify {
stake,
current_epoch,
},
ExtractedAction::Remove(account.pubkey, account.slot),
current_end_epoch_slot,
);
} else {
let Ok(delegated_stake_opt) = account.read_stake() else {
bail!("Can't read stake from account data");
};
//during extract push the new update or
//don't insertnow account change that has been done in next epoch.
//put in update pool to be merged next epoch change.
// let insert_stake = !self.extracted || ststake.last_update_slot > current_end_epoch_slot;
// match insert_stake {
// false => self.updates.push(ExtractedAction::Notify {
// stake_account: new_account.pubkey,
// stake: ststake,
// }),
// true => self.notify_stake(new_account.pubkey, ststake, current_epoch),
// }
if let Some(delegated_stake) = delegated_stake_opt {
let stake = StoredStake {
pubkey: account.pubkey,
lamports: account.lamports,
stake: delegated_stake,
last_update_slot: account.slot,
write_version: account.write_version,
};
self.notify_stake_action(ExtractedAction::Notify { stake }, current_end_epoch_slot);
}
}
Ok(())
@ -206,11 +174,8 @@ impl StakeStore {
fn process_stake_action(&mut self, action: ExtractedAction) {
match action {
ExtractedAction::Notify {
stake,
current_epoch,
} => {
stake_map_notify_stake(&mut self.stakes, stake, current_epoch);
ExtractedAction::Notify { stake } => {
stake_map_notify_stake(&mut self.stakes, stake);
}
ExtractedAction::Remove(account_pk, slot) => self.remove_from_store(&account_pk, slot),
//not use currently. TODO remove.
@ -229,27 +194,31 @@ impl StakeStore {
//return the contained stake map to do an external update.
// During extract period (between extract and merge) added stake a stored to be processed later.
//if the store is already extracted return an error.
fn extract(self) -> anyhow::Result<(Self, StakeMap)> {
fn extract(self) -> anyhow::Result<(Self, (StakeMap, Option<StakeHistory>))> {
if self.extracted {
bail!("StakeStore already extracted. Try later");
}
let stakestore = StakeStore {
stakes: HashMap::new(),
stake_history: self.stake_history,
stake_history: None,
updates: self.updates,
extracted: true,
};
Ok((stakestore, self.stakes))
Ok((stakestore, (self.stakes, self.stake_history)))
}
//Merge the stake map an,d replace the current one.
fn merge_stakes(mut self, stakes: StakeMap, current_epoch: u64) -> anyhow::Result<Self> {
fn merge_stakes(
mut self,
stakes: StakeMap,
stake_history: Option<StakeHistory>,
) -> anyhow::Result<Self> {
if !self.extracted {
bail!("StakeStore merge of non extracted map. Try later");
}
let mut stakestore = StakeStore {
stakes,
stake_history: self.stake_history,
stake_history,
updates: vec![],
extracted: false,
};
@ -257,25 +226,9 @@ impl StakeStore {
self.updates
.sort_unstable_by(|a, b| a.get_update_slot().cmp(&b.get_update_slot()));
//apply stake added during extraction.
for mut action in self.updates {
action.update_epoch(current_epoch);
for action in self.updates {
stakestore.process_stake_action(action);
}
//verify one stake account to test. TODO remove
let stake_account = stakestore
.stakes
.get(&Pubkey::from_str("2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce").unwrap());
let stake = stake_account.map(|stake| {
stake
.stake
.stake(current_epoch, stakestore.stake_history.as_ref(), Some(0))
});
log::info!(
"merge_stakes 2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce:{:?}",
stake
);
Ok(stakestore)
}
@ -286,7 +239,7 @@ impl StakeStore {
.map(|stake| stake.last_update_slot <= update_slot)
.unwrap_or(true)
{
log::info!("remove_from_store for {}", account_pk.to_string());
log::info!("Stake remove_from_store for {}", account_pk.to_string());
self.stakes.remove(account_pk);
}
}
@ -296,7 +249,6 @@ pub fn merge_program_account_in_strake_map(
stake_map: &mut StakeMap,
stakes_list: Vec<(Pubkey, Account)>,
last_update_slot: Slot,
current_epoch: u64,
) {
stakes_list
.into_iter()
@ -318,7 +270,7 @@ pub fn merge_program_account_in_strake_map(
write_version: 0,
};
stake_map_notify_stake(stake_map, stake, current_epoch);
stake_map_notify_stake(stake_map, stake);
});
}
@ -764,21 +716,21 @@ pub async fn process_stake_tx_message(
current_end_epoch_slot,
);
send_verification(
stake_sender,
stakestore,
"Merge Destination",
account_keys[instruction.accounts[0] as usize],
)
.await;
// send_verification(
// stake_sender,
// stakestore,
// "Merge Destination",
// account_keys[instruction.accounts[0] as usize],
// )
// .await;
send_verification(
stake_sender,
stakestore,
"Merge Source",
account_keys[instruction.accounts[1] as usize],
)
.await;
// send_verification(
// stake_sender,
// stakestore,
// "Merge Source",
// account_keys[instruction.accounts[1] as usize],
// )
// .await;
}
}
}

View File

@ -6,8 +6,9 @@ use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::vote::state::VoteState;
use std::collections::HashMap;
use std::sync::Arc;
pub type VoteMap = HashMap<Pubkey, StoredVote>;
pub type VoteMap = HashMap<Pubkey, Arc<StoredVote>>;
//Copy the solana_rpc_client_api::response::RpcVoteAccountInfo struct
//to avoid to add a dep to solana_rpc_client that create
@ -104,32 +105,50 @@ impl VoteStore {
new_account: AccountPretty,
current_end_epoch_slot: Slot,
) -> anyhow::Result<()> {
let Ok(vote_data) = new_account.read_vote() else {
bail!("Can't read Vote from account data");
};
if new_account.lamports == 0 {
self.remove_from_store(&new_account.pubkey, new_account.slot);
} else {
let Ok(vote_data) = new_account.read_vote() else {
bail!("Can't read Vote from account data");
};
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
let new_voteacc = StoredVote {
pubkey: new_account.pubkey,
vote_data,
last_update_slot: new_account.slot,
write_version: new_account.write_version,
};
let new_voteacc = StoredVote {
pubkey: new_account.pubkey,
vote_data,
last_update_slot: new_account.slot,
write_version: new_account.write_version,
};
//during extract push the new update or
//don't insertnow account change that has been done in next epoch.
//put in update pool to be merged next epoch change.
let insert_stake = !self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
match insert_stake {
false => self.updates.push((new_account.pubkey, new_voteacc)),
true => self.insert_vote(new_account.pubkey, new_voteacc),
//during extract push the new update or
//don't insertnow account change that has been done in next epoch.
//put in update pool to be merged next epoch change.
let insert_stake =
!self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
match insert_stake {
false => self.updates.push((new_account.pubkey, new_voteacc)),
true => self.insert_vote(new_account.pubkey, new_voteacc),
}
}
Ok(())
}
fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) {
vote_map_insert_vote(&mut self.votes, vote_account, vote_data);
}
fn remove_from_store(&mut self, account_pk: &Pubkey, update_slot: Slot) {
if self
.votes
.get(account_pk)
.map(|vote| vote.last_update_slot <= update_slot)
.unwrap_or(true)
{
log::info!("Vote remove_from_store for {}", account_pk.to_string());
self.votes.remove(account_pk);
}
}
}
pub fn merge_program_account_in_vote_map(
@ -180,7 +199,7 @@ fn vote_map_insert_vote(map: &mut VoteMap, vote_account_pk: Pubkey, vote_data: S
);
}
*voteacc = vote_data;
*voteacc = Arc::new(vote_data);
}
}
// If value doesn't exist yet, then insert a new value of 1
@ -190,7 +209,7 @@ fn vote_map_insert_vote(map: &mut VoteMap, vote_account_pk: Pubkey, vote_data: S
vote_data.vote_data.node_pubkey,
vote_data.vote_data.root_slot,
);
vacant.insert(vote_data);
vacant.insert(Arc::new(vote_data));
}
};
}