Compare commits
3 Commits
2dc96a7a50
...
1c6aa5fe8b
Author | SHA1 | Date |
---|---|---|
musitdev | 1c6aa5fe8b | |
musitdev | f91965e1dd | |
musitdev | db8a3079cb |
|
@ -1,12 +1,12 @@
|
|||
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
|
||||
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
|
||||
use crate::Slot;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use solana_client::client_error::ClientError;
|
||||
use solana_client::rpc_client::RpcClient;
|
||||
use solana_sdk::account::Account;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::stake_history::StakeHistory;
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinHandle;
|
||||
/*
|
||||
|
@ -64,7 +64,7 @@ pub enum BootstrapEvent {
|
|||
Vec<(Pubkey, Account)>,
|
||||
Account,
|
||||
),
|
||||
AccountsMerged(StakeMap, VoteMap),
|
||||
AccountsMerged(StakeMap, Option<StakeHistory>, VoteMap),
|
||||
Exit,
|
||||
}
|
||||
|
||||
|
@ -76,8 +76,6 @@ enum BootsrapProcessResult {
|
|||
|
||||
pub struct BootstrapData {
|
||||
pub done: bool,
|
||||
pub current_epoch: u64,
|
||||
pub next_epoch_start_slot: Slot,
|
||||
pub sleep_time: u64,
|
||||
pub rpc_url: String,
|
||||
}
|
||||
|
@ -116,7 +114,7 @@ fn process_bootstrap_event(
|
|||
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history) => {
|
||||
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
|
||||
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
|
||||
(Ok(stake_map), Ok(vote_map)) => BootsrapProcessResult::Event(
|
||||
(Ok((stake_map, _)), Ok(vote_map)) => BootsrapProcessResult::Event(
|
||||
BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes, history),
|
||||
),
|
||||
_ => {
|
||||
|
@ -131,27 +129,20 @@ fn process_bootstrap_event(
|
|||
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes, history) => {
|
||||
log::info!("BootstrapEvent::StoreExtracted RECV");
|
||||
|
||||
match crate::stakestore::read_historystake_from_account(history) {
|
||||
Some(stake_history) => {
|
||||
log::info!(
|
||||
"Read stake history done with history len:{}",
|
||||
stake_history.len()
|
||||
);
|
||||
stakestore.set_stake_history(stake_history);
|
||||
}
|
||||
None => log::error!("Bootstrap error, can't read stake history."),
|
||||
let stake_history = crate::stakestore::read_historystake_from_account(history);
|
||||
if stake_history.is_none() {
|
||||
//TODO return error.
|
||||
log::error!("Bootstrap error, can't read stake history.");
|
||||
}
|
||||
|
||||
//merge new PA with stake map and vote map in a specific task
|
||||
let jh = tokio::task::spawn_blocking({
|
||||
let current_epoch = data.current_epoch;
|
||||
move || {
|
||||
//update pa_list to set slot update to start epoq one.
|
||||
crate::stakestore::merge_program_account_in_strake_map(
|
||||
&mut stake_map,
|
||||
stakes,
|
||||
0, //with RPC no way to know the slot of the account update. Set to 0.
|
||||
current_epoch,
|
||||
);
|
||||
crate::votestore::merge_program_account_in_vote_map(
|
||||
&mut vote_map,
|
||||
|
@ -159,15 +150,15 @@ fn process_bootstrap_event(
|
|||
0, //with RPC no way to know the slot of the account update. Set to 0.
|
||||
);
|
||||
|
||||
BootstrapEvent::AccountsMerged(stake_map, vote_map)
|
||||
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map)
|
||||
}
|
||||
});
|
||||
BootsrapProcessResult::TaskHandle(jh)
|
||||
}
|
||||
BootstrapEvent::AccountsMerged(stake_map, vote_map) => {
|
||||
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map) => {
|
||||
log::info!("BootstrapEvent::AccountsMerged RECV");
|
||||
match (
|
||||
merge_stakestore(stakestore, stake_map, data.current_epoch),
|
||||
merge_stakestore(stakestore, stake_map, stake_history),
|
||||
merge_votestore(votestore, vote_map),
|
||||
) {
|
||||
(Ok(()), Ok(())) => BootsrapProcessResult::End,
|
||||
|
|
|
@ -1,132 +1,150 @@
|
|||
use crate::leader_schedule::LeaderScheduleEvent;
|
||||
use crate::Slot;
|
||||
use anyhow::bail;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use solana_account_decoder::parse_sysvar::SysvarAccountType;
|
||||
use solana_client::client_error::ClientError;
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
|
||||
use solana_sdk::epoch_info::EpochInfo;
|
||||
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
|
||||
use std::sync::Arc;
|
||||
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
|
||||
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
|
||||
|
||||
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
|
||||
pub struct Epoch {
|
||||
pub epoch: u64,
|
||||
pub slot_index: u64,
|
||||
pub slots_in_epoch: u64,
|
||||
pub absolute_slot: Slot,
|
||||
}
|
||||
|
||||
impl Epoch {
|
||||
pub fn get_next_epoch(&self, current_epoch: &CurrentEpochSlotState) -> Epoch {
|
||||
let last_slot = current_epoch.epoch_cache.get_last_slot_in_epoch(self.epoch);
|
||||
current_epoch.epoch_cache.get_epoch_at_slot(last_slot + 1)
|
||||
}
|
||||
|
||||
pub fn into_epoch_info(&self, block_height: u64, transaction_count: Option<u64>) -> EpochInfo {
|
||||
EpochInfo {
|
||||
epoch: self.epoch,
|
||||
slot_index: self.slot_index,
|
||||
slots_in_epoch: self.slots_in_epoch,
|
||||
absolute_slot: self.absolute_slot,
|
||||
block_height,
|
||||
transaction_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_epoch_for_slot(slot: Slot, current_epoch: &CurrentEpochSlotState) -> u64 {
|
||||
let slots_in_epoch = current_epoch.current_epoch.slots_in_epoch;
|
||||
let epoch_start_slot = current_epoch.current_epoch_start_slot();
|
||||
if slot >= epoch_start_slot {
|
||||
let slot_distance = (slot - epoch_start_slot) / slots_in_epoch;
|
||||
current_epoch.current_epoch.epoch + slot_distance
|
||||
} else {
|
||||
let slot_distance = (epoch_start_slot - slot) / slots_in_epoch;
|
||||
current_epoch.current_epoch.epoch - slot_distance
|
||||
current_epoch.epoch_cache.get_epoch_at_slot(slot).epoch
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EpochCache {
|
||||
epoch_schedule: Arc<EpochSchedule>,
|
||||
}
|
||||
|
||||
impl EpochCache {
|
||||
pub fn get_epoch_at_slot(&self, slot: Slot) -> Epoch {
|
||||
let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot);
|
||||
let slots_in_epoch = self.epoch_schedule.get_slots_in_epoch(epoch);
|
||||
Epoch {
|
||||
epoch,
|
||||
slot_index,
|
||||
slots_in_epoch,
|
||||
absolute_slot: slot,
|
||||
}
|
||||
}
|
||||
|
||||
// pub fn get_epoch_shedule(&self) -> EpochSchedule {
|
||||
// self.epoch_schedule.as_ref().clone()
|
||||
// }
|
||||
|
||||
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
|
||||
self.epoch_schedule.get_slots_in_epoch(epoch)
|
||||
}
|
||||
|
||||
// pub fn get_first_slot_in_epoch(&self, epoch: u64) -> u64 {
|
||||
// self.epoch_schedule.get_first_slot_in_epoch(epoch)
|
||||
// }
|
||||
|
||||
pub fn get_last_slot_in_epoch(&self, epoch: u64) -> u64 {
|
||||
self.epoch_schedule.get_last_slot_in_epoch(epoch)
|
||||
}
|
||||
|
||||
pub async fn bootstrap_epoch(rpc_client: &RpcClient) -> anyhow::Result<EpochCache> {
|
||||
let res_epoch = rpc_client
|
||||
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
|
||||
.await?;
|
||||
let Some(SysvarAccountType::EpochSchedule(epoch_schedule)) =
|
||||
bincode::deserialize(&res_epoch.data[..])
|
||||
.ok()
|
||||
.map(SysvarAccountType::EpochSchedule)
|
||||
else {
|
||||
bail!("Error during bootstrap epoch. SysvarAccountType::EpochSchedule can't be deserilized. Epoch can't be calculated.");
|
||||
};
|
||||
|
||||
Ok(EpochCache {
|
||||
epoch_schedule: Arc::new(epoch_schedule),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CurrentEpochSlotState {
|
||||
pub current_slot: CurrentSlot,
|
||||
pub current_epoch: EpochInfo,
|
||||
pub next_epoch_start_slot: Slot,
|
||||
pub first_epoch_slot: bool,
|
||||
epoch_cache: EpochCache,
|
||||
current_epoch_value: Epoch,
|
||||
}
|
||||
|
||||
impl CurrentEpochSlotState {
|
||||
pub async fn bootstrap(rpc_url: String) -> Result<CurrentEpochSlotState, ClientError> {
|
||||
// pub fn get_epoch_shedule(&self) -> EpochSchedule {
|
||||
// self.epoch_cache.get_epoch_shedule()
|
||||
// }
|
||||
|
||||
pub fn get_current_epoch(&self) -> Epoch {
|
||||
self.current_epoch_value
|
||||
}
|
||||
|
||||
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
|
||||
self.epoch_cache.get_slots_in_epoch(epoch)
|
||||
}
|
||||
|
||||
pub async fn bootstrap(rpc_url: String) -> anyhow::Result<CurrentEpochSlotState> {
|
||||
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized());
|
||||
|
||||
//get reduce_stake_warmup_cooldown feature info.
|
||||
//NOT AND ACCOUNT. Get from config.
|
||||
// let reduce_stake_warmup_cooldown_epoch = rpc_client
|
||||
// .get_account(&feature_set::reduce_stake_warmup_cooldown::id())
|
||||
// .await?;
|
||||
let epoch_cache = EpochCache::bootstrap_epoch(&rpc_client).await?;
|
||||
|
||||
// let reduce_stake_warmup_cooldown_epoch = bincode::deserialize(&reduce_stake_warmup_cooldown_epoch.data[..])
|
||||
// .ok()
|
||||
// .map(SysvarAccountType::EpochSchedule);
|
||||
// log::info!("reduce_stake_warmup_cooldown_epoch {reduce_stake_warmup_cooldown_epoch:?}");
|
||||
|
||||
//get epoch sysvar account to init epoch. More compatible with a snapshot bootstrap.
|
||||
//sysvar_epoch_schedule Some(EpochSchedule(EpochSchedule { slots_per_epoch: 100, leader_schedule_slot_offset: 100, warmup: false, first_normal_epoch: 0, first_normal_slot: 0 }))
|
||||
let res_epoch = rpc_client
|
||||
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
|
||||
.await?;
|
||||
let sysvar_epoch_schedule = bincode::deserialize(&res_epoch.data[..])
|
||||
.ok()
|
||||
.map(SysvarAccountType::EpochSchedule);
|
||||
log::info!("sysvar_epoch_schedule {sysvar_epoch_schedule:?}");
|
||||
|
||||
// Fetch current epoch
|
||||
let current_epoch = rpc_client.get_epoch_info().await?;
|
||||
let next_epoch_start_slot =
|
||||
current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
|
||||
log::info!("Run_loop init {next_epoch_start_slot} current_epoch:{current_epoch:?}");
|
||||
Ok(CurrentEpochSlotState {
|
||||
current_slot: Default::default(),
|
||||
current_epoch,
|
||||
next_epoch_start_slot,
|
||||
first_epoch_slot: false,
|
||||
current_slot: CurrentSlot::default(),
|
||||
epoch_cache,
|
||||
current_epoch_value: Epoch::default(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn current_epoch_start_slot(&self) -> Slot {
|
||||
self.current_epoch.absolute_slot - self.current_epoch.slot_index
|
||||
}
|
||||
// pub fn current_epoch_start_slot(&self) -> Slot {
|
||||
// self.epoch_cache
|
||||
// .get_first_slot_in_epoch(self.current_slot.confirmed_slot)
|
||||
// }
|
||||
|
||||
pub fn current_epoch_end_slot(&self) -> Slot {
|
||||
self.next_epoch_start_slot - 1
|
||||
self.epoch_cache
|
||||
.get_last_slot_in_epoch(self.current_epoch_value.epoch)
|
||||
}
|
||||
|
||||
pub fn process_new_slot(
|
||||
&mut self,
|
||||
new_slot: &SubscribeUpdateSlot,
|
||||
) -> Option<LeaderScheduleEvent> {
|
||||
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
|
||||
//for the first update of slot correct epoch info data.
|
||||
if self.current_slot.confirmed_slot == 0 {
|
||||
let diff = new_slot.slot - self.current_epoch.absolute_slot;
|
||||
self.current_epoch.slot_index += diff;
|
||||
self.current_epoch.absolute_slot = new_slot.slot;
|
||||
log::trace!(
|
||||
"Set current epoch with diff:{diff} slot:{} current:{:?}",
|
||||
new_slot.slot,
|
||||
self.current_epoch,
|
||||
);
|
||||
//if new slot update slot related state data.
|
||||
} else if new_slot.slot > self.current_slot.confirmed_slot {
|
||||
//update epoch info
|
||||
let mut diff = new_slot.slot - self.current_slot.confirmed_slot;
|
||||
//First epoch slot, index is 0 so remove 1 from diff.
|
||||
if self.first_epoch_slot {
|
||||
//calculate next epoch data
|
||||
self.current_epoch.epoch += 1;
|
||||
//slot can be non consecutif, use diff.
|
||||
self.current_epoch.slot_index = 0;
|
||||
self.current_epoch.absolute_slot = new_slot.slot;
|
||||
log::info!(
|
||||
"change_epoch calculated next epoch:{:?} at slot:{}",
|
||||
self.current_epoch,
|
||||
new_slot.slot,
|
||||
);
|
||||
|
||||
self.first_epoch_slot = false;
|
||||
//slot_index start at 0.
|
||||
diff -= 1; //diff is always >= 1
|
||||
self.next_epoch_start_slot =
|
||||
self.next_epoch_start_slot + self.current_epoch.slots_in_epoch;
|
||||
//set to next epochs.
|
||||
} else {
|
||||
self.current_epoch.absolute_slot = new_slot.slot;
|
||||
}
|
||||
self.current_epoch.slot_index += diff;
|
||||
|
||||
log::trace!(
|
||||
"Slot epoch with slot:{} , diff:{diff} current epoch state:{self:?}",
|
||||
new_slot.slot
|
||||
);
|
||||
}
|
||||
}
|
||||
//update slot state for all commitment.
|
||||
self.current_slot.update_slot(&new_slot);
|
||||
|
||||
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
|
||||
if self.current_epoch_value.epoch == 0 {
|
||||
//init first epoch
|
||||
self.current_epoch_value = self.epoch_cache.get_epoch_at_slot(new_slot.slot);
|
||||
}
|
||||
self.manage_change_epoch()
|
||||
} else {
|
||||
None
|
||||
|
@ -134,22 +152,17 @@ impl CurrentEpochSlotState {
|
|||
}
|
||||
|
||||
fn manage_change_epoch(&mut self) -> Option<LeaderScheduleEvent> {
|
||||
//we change the epoch at the last slot of the current epoch.
|
||||
if self.current_slot.confirmed_slot >= self.current_epoch_end_slot() {
|
||||
log::info!(
|
||||
"End epoch slot detected:{}",
|
||||
self.current_slot.confirmed_slot
|
||||
);
|
||||
|
||||
//set epoch change effectif at next slot.
|
||||
self.first_epoch_slot = true;
|
||||
if self.current_slot.confirmed_slot > self.current_epoch_end_slot() {
|
||||
log::info!("Change epoch at slot:{}", self.current_slot.confirmed_slot);
|
||||
|
||||
self.current_epoch_value = self.get_current_epoch().get_next_epoch(&self);
|
||||
//start leader schedule calculus
|
||||
//switch to 2 next epoch to calculate schedule at next epoch.
|
||||
//at current epoch change the schedule is calculated for the next epoch.
|
||||
let schedule_epoch = crate::leader_schedule::next_schedule_epoch(&self.current_epoch);
|
||||
let schedule_epoch = crate::leader_schedule::next_schedule_epoch(&schedule_epoch);
|
||||
Some(LeaderScheduleEvent::InitLeaderschedule(schedule_epoch))
|
||||
Some(crate::leader_schedule::create_schedule_init_event(
|
||||
self.current_epoch_value.epoch,
|
||||
self.get_slots_in_epoch(self.current_epoch_value.epoch),
|
||||
Arc::clone(&self.epoch_cache.epoch_schedule),
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
|
|
@ -1,15 +1,19 @@
|
|||
use crate::epoch::CurrentEpochSlotState;
|
||||
use crate::epoch::Epoch;
|
||||
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
|
||||
use crate::votestore::StoredVote;
|
||||
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
|
||||
use anyhow::bail;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use solana_client::rpc_client::RpcClient;
|
||||
use solana_ledger::leader_schedule::LeaderSchedule;
|
||||
use solana_program::sysvar::epoch_schedule::EpochSchedule;
|
||||
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use solana_sdk::epoch_info::EpochInfo;
|
||||
use solana_sdk::feature_set::FeatureSet;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::stake::state::StakeActivationStatus;
|
||||
use solana_sdk::stake_history::StakeHistory;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
|
@ -34,75 +38,69 @@ pub struct CalculatedSchedule {
|
|||
#[derive(Debug)]
|
||||
pub struct LeaderScheduleData {
|
||||
pub schedule: Arc<HashMap<String, Vec<usize>>>,
|
||||
pub vote_stakes: Vec<(Pubkey, u64)>,
|
||||
pub vote_stakes: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
|
||||
pub epoch: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EpochStake {
|
||||
epoch: u64,
|
||||
stakes: Vec<(Pubkey, u64)>,
|
||||
stake_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
|
||||
}
|
||||
|
||||
impl From<SavedStake> for EpochStake {
|
||||
fn from(saved_stakes: SavedStake) -> Self {
|
||||
EpochStake {
|
||||
epoch: saved_stakes.epoch,
|
||||
stakes: saved_stakes
|
||||
.stakes
|
||||
stake_vote_map: saved_stakes
|
||||
.stake_vote_map
|
||||
.into_iter()
|
||||
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), st))
|
||||
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), (st.0, st.1)))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next_schedule_epoch(current_epoch: &EpochInfo) -> EpochInfo {
|
||||
let mut next_epoch_info = current_epoch.clone();
|
||||
next_epoch_info.epoch += 1;
|
||||
next_epoch_info.slot_index = 0;
|
||||
next_epoch_info.absolute_slot =
|
||||
current_epoch.absolute_slot + current_epoch.slots_in_epoch - current_epoch.slot_index;
|
||||
next_epoch_info
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
struct SavedStake {
|
||||
epoch: u64,
|
||||
stake_vote_map: HashMap<String, (u64, Arc<StoredVote>)>,
|
||||
}
|
||||
|
||||
pub fn bootstrap_leader_schedule(
|
||||
current_file_patch: &str,
|
||||
next_file_patch: &str,
|
||||
slots_in_epoch: u64,
|
||||
epoch_state: &CurrentEpochSlotState,
|
||||
) -> anyhow::Result<CalculatedSchedule> {
|
||||
let mut current_stakes = read_schedule_vote_stakes(current_file_patch)?;
|
||||
let mut next_stakes = read_schedule_vote_stakes(next_file_patch)?;
|
||||
let current_stakes = read_schedule_vote_stakes(current_file_patch)?;
|
||||
let next_stakes = read_schedule_vote_stakes(next_file_patch)?;
|
||||
|
||||
//calcualte leader schedule for all vote stakes.
|
||||
let current_schedule = calculate_leader_schedule(
|
||||
&mut current_stakes.stakes,
|
||||
¤t_stakes.stake_vote_map,
|
||||
current_stakes.epoch,
|
||||
slots_in_epoch,
|
||||
)?;
|
||||
let next_schedule =
|
||||
calculate_leader_schedule(&mut next_stakes.stakes, next_stakes.epoch, slots_in_epoch)?;
|
||||
epoch_state.get_slots_in_epoch(current_stakes.epoch),
|
||||
);
|
||||
let next_schedule = calculate_leader_schedule(
|
||||
&next_stakes.stake_vote_map,
|
||||
next_stakes.epoch,
|
||||
epoch_state.get_slots_in_epoch(next_stakes.epoch),
|
||||
);
|
||||
|
||||
Ok(CalculatedSchedule {
|
||||
current: Some(LeaderScheduleData {
|
||||
schedule: Arc::new(current_schedule),
|
||||
vote_stakes: current_stakes.stakes,
|
||||
epoch: current_stakes.epoch,
|
||||
vote_stakes: current_stakes.stake_vote_map,
|
||||
epoch: epoch_state.get_slots_in_epoch(current_stakes.epoch),
|
||||
}),
|
||||
next: Some(LeaderScheduleData {
|
||||
schedule: Arc::new(next_schedule),
|
||||
vote_stakes: next_stakes.stakes,
|
||||
epoch: next_stakes.epoch,
|
||||
vote_stakes: next_stakes.stake_vote_map,
|
||||
epoch: epoch_state.get_slots_in_epoch(next_stakes.epoch),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
struct SavedStake {
|
||||
epoch: u64,
|
||||
stakes: Vec<(String, u64)>,
|
||||
}
|
||||
|
||||
fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
|
||||
let content = std::fs::read_to_string(file_path)?;
|
||||
let stakes_str: SavedStake = serde_json::from_str(&content)?;
|
||||
|
@ -112,16 +110,16 @@ fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
|
|||
|
||||
fn save_schedule_vote_stakes(
|
||||
base_file_path: &str,
|
||||
stakes: &Vec<(Pubkey, u64)>,
|
||||
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
|
||||
epoch: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
//save new schedule for restart.
|
||||
let filename = format!("{base_file_path}_{epoch}.json",);
|
||||
let filename = format!("{base_file_path}_{}.json", epoch);
|
||||
let save_stakes = SavedStake {
|
||||
epoch,
|
||||
stakes: stakes
|
||||
stake_vote_map: stake_vote_map
|
||||
.iter()
|
||||
.map(|(pk, st)| (pk.to_string(), *st))
|
||||
.map(|(pk, st)| (pk.to_string(), (st.0, Arc::clone(&st.1))))
|
||||
.collect(),
|
||||
};
|
||||
let serialized_stakes = serde_json::to_string(&save_stakes).unwrap();
|
||||
|
@ -157,11 +155,8 @@ pub fn run_leader_schedule_events(
|
|||
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
|
||||
stakestore: &mut StakeStore,
|
||||
votestore: &mut VoteStore,
|
||||
) -> Option<(
|
||||
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
|
||||
EpochInfo,
|
||||
)> {
|
||||
let result = process_leadershedule_event(rpc_url.clone(), event, stakestore, votestore);
|
||||
) -> Option<LeaderScheduleData> {
|
||||
let result = process_leadershedule_event(event, stakestore, votestore);
|
||||
match result {
|
||||
LeaderScheduleResult::TaskHandle(jh) => {
|
||||
bootstrap_tasks.push(jh);
|
||||
|
@ -170,132 +165,155 @@ pub fn run_leader_schedule_events(
|
|||
LeaderScheduleResult::Event(event) => {
|
||||
run_leader_schedule_events(rpc_url, event, bootstrap_tasks, stakestore, votestore)
|
||||
}
|
||||
LeaderScheduleResult::End(schedule, epoch) => Some((schedule, epoch)),
|
||||
LeaderScheduleResult::End(schedule) => Some(schedule),
|
||||
}
|
||||
}
|
||||
|
||||
// struct LeaderScheduleEpochData {
|
||||
// current_epoch: u64,
|
||||
// next_epoch: u64,
|
||||
// slots_in_epoch: u64,
|
||||
// epoch_schedule: Arc<EpochSchedule>,
|
||||
// }
|
||||
|
||||
pub enum LeaderScheduleEvent {
|
||||
InitLeaderschedule(EpochInfo),
|
||||
CalculateScedule(StakeMap, VoteMap, EpochInfo, Option<StakeHistory>),
|
||||
Init(u64, u64, Arc<EpochSchedule>),
|
||||
MergeStoreAndSaveSchedule(
|
||||
StakeMap,
|
||||
VoteMap,
|
||||
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
|
||||
EpochInfo,
|
||||
LeaderScheduleData,
|
||||
(u64, u64, Arc<EpochSchedule>),
|
||||
Option<StakeHistory>,
|
||||
),
|
||||
}
|
||||
|
||||
enum LeaderScheduleResult {
|
||||
TaskHandle(JoinHandle<LeaderScheduleEvent>),
|
||||
Event(LeaderScheduleEvent),
|
||||
End(
|
||||
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
|
||||
EpochInfo,
|
||||
),
|
||||
End(LeaderScheduleData),
|
||||
}
|
||||
|
||||
pub fn create_schedule_init_event(
|
||||
current_epoch: u64,
|
||||
slots_in_epoch: u64,
|
||||
epoch_schedule: Arc<EpochSchedule>,
|
||||
) -> LeaderScheduleEvent {
|
||||
log::info!("LeaderScheduleEvent::InitLeaderschedule START");
|
||||
//TODO get a way to be updated of stake history.
|
||||
//request the current state using RPC.
|
||||
//TODO remove the await from the scheduler task.
|
||||
// let before_stake_histo = stake_history.clone();
|
||||
// let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
|
||||
// .map(|account| crate::stakestore::read_historystake_from_account(account))
|
||||
// .unwrap_or_else(|_| {
|
||||
// log::error!("Error during stake history fetch. Use bootstrap one");
|
||||
// stake_history.map(|st| st.as_ref().clone())
|
||||
// });
|
||||
// log::info!(
|
||||
// "stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
|
||||
// epoch_data.current_epoch
|
||||
// );
|
||||
LeaderScheduleEvent::Init(current_epoch, slots_in_epoch, epoch_schedule)
|
||||
}
|
||||
|
||||
//TODO remove desactivated account after leader schedule calculus.
|
||||
fn process_leadershedule_event(
|
||||
rpc_url: String,
|
||||
// rpc_url: String,
|
||||
event: LeaderScheduleEvent,
|
||||
stakestore: &mut StakeStore,
|
||||
votestore: &mut VoteStore,
|
||||
) -> LeaderScheduleResult {
|
||||
match event {
|
||||
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch) => {
|
||||
log::info!("LeaderScheduleEvent::InitLeaderschedule RECV");
|
||||
//For test TODO put in extract and restore process to avoid to clone.
|
||||
let stake_history = stakestore.get_cloned_stake_history();
|
||||
//TODO get a way to be updated of stake history.
|
||||
//request the current state using RPC.
|
||||
//TODO remove the await from the scheduler task.
|
||||
let before_stake_histo = stake_history.clone();
|
||||
let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
|
||||
.map(|account| crate::stakestore::read_historystake_from_account(account))
|
||||
.unwrap_or_else(|_| {
|
||||
log::error!("Error during stake history fetch. Use bootstrap one");
|
||||
stake_history
|
||||
});
|
||||
log::info!(
|
||||
"stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
|
||||
schedule_epoch.epoch
|
||||
);
|
||||
LeaderScheduleEvent::Init(current_epoch, slots_in_epoch, epoch_schedule) => {
|
||||
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
|
||||
(Ok(stake_map), Ok(vote_map)) => {
|
||||
LeaderScheduleResult::Event(LeaderScheduleEvent::CalculateScedule(
|
||||
stake_map,
|
||||
vote_map,
|
||||
schedule_epoch,
|
||||
stake_history,
|
||||
))
|
||||
}
|
||||
_ => {
|
||||
log::warn!("process_leadershedule_event error during extract store");
|
||||
let jh = tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch)
|
||||
});
|
||||
LeaderScheduleResult::TaskHandle(jh)
|
||||
}
|
||||
}
|
||||
}
|
||||
LeaderScheduleEvent::CalculateScedule(
|
||||
stake_map,
|
||||
vote_map,
|
||||
schedule_epoch,
|
||||
stake_history,
|
||||
) => {
|
||||
log::info!("LeaderScheduleEvent::CalculateScedule RECV");
|
||||
(Ok((stake_map, mut stake_history)), Ok(vote_map)) => {
|
||||
//For test TODO put in extract and restore process to avoid to clone.
|
||||
log::info!("LeaderScheduleEvent::CalculateScedule");
|
||||
let epoch_schedule = Arc::clone(&epoch_schedule);
|
||||
let jh = tokio::task::spawn_blocking({
|
||||
move || {
|
||||
let schedule_and_stakes =
|
||||
crate::leader_schedule::calculate_leader_schedule_from_stake_map(
|
||||
let next_epoch = current_epoch + 1;
|
||||
let epoch_vote_stakes = calculate_epoch_stakes(
|
||||
&stake_map,
|
||||
&vote_map,
|
||||
&schedule_epoch,
|
||||
stake_history.as_ref(),
|
||||
current_epoch,
|
||||
next_epoch,
|
||||
stake_history.as_mut(),
|
||||
&epoch_schedule,
|
||||
);
|
||||
if let Ok((_, vote_stakes)) = &schedule_and_stakes {
|
||||
|
||||
let leader_schedule = calculate_leader_schedule(
|
||||
&epoch_vote_stakes,
|
||||
next_epoch,
|
||||
slots_in_epoch,
|
||||
);
|
||||
|
||||
// let epoch_vote_stakes_old = calculate_epoch_stakes_old_algo(
|
||||
// &stake_map,
|
||||
// &vote_map,
|
||||
// &epoch_state.next_epoch,
|
||||
// stake_history.as_ref(),
|
||||
// );
|
||||
|
||||
// let leader_schedule_old =
|
||||
// calculate_leader_schedule(&epoch_vote_stakes_old, &next_epoch);
|
||||
if let Err(err) = save_schedule_vote_stakes(
|
||||
SCHEDULE_STAKE_BASE_FILE_NAME,
|
||||
vote_stakes,
|
||||
schedule_epoch.epoch,
|
||||
&epoch_vote_stakes,
|
||||
next_epoch,
|
||||
) {
|
||||
log::error!(
|
||||
"Error during saving in file of the new schedule of epoch:{} error:{err}",
|
||||
schedule_epoch.epoch
|
||||
next_epoch
|
||||
);
|
||||
}
|
||||
}
|
||||
log::info!("End calculate leader schedule");
|
||||
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
|
||||
stake_map,
|
||||
vote_map,
|
||||
schedule_and_stakes,
|
||||
schedule_epoch,
|
||||
LeaderScheduleData {
|
||||
schedule: Arc::new(leader_schedule),
|
||||
vote_stakes: epoch_vote_stakes,
|
||||
epoch: next_epoch,
|
||||
},
|
||||
(current_epoch, slots_in_epoch, epoch_schedule),
|
||||
stake_history,
|
||||
)
|
||||
}
|
||||
});
|
||||
LeaderScheduleResult::TaskHandle(jh)
|
||||
}
|
||||
_ => {
|
||||
log::error!("Create leadershedule init event error during extract store");
|
||||
LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
|
||||
current_epoch,
|
||||
slots_in_epoch,
|
||||
epoch_schedule,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
|
||||
stake_map,
|
||||
vote_map,
|
||||
schedule,
|
||||
schedule_epoch,
|
||||
schedule_data,
|
||||
(current_epoch, slots_in_epoch, epoch_schedule),
|
||||
stake_history,
|
||||
) => {
|
||||
log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV");
|
||||
match (
|
||||
merge_stakestore(stakestore, stake_map, schedule_epoch.epoch),
|
||||
merge_stakestore(stakestore, stake_map, stake_history),
|
||||
merge_votestore(votestore, vote_map),
|
||||
) {
|
||||
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule, schedule_epoch),
|
||||
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule_data),
|
||||
_ => {
|
||||
//this shoud never arrive because the store has been extracted before.
|
||||
//TODO remove this error using type state
|
||||
log::warn!("LeaderScheduleEvent::MergeStoreAndSaveSchedule merge stake or vote fail, -restart Schedule");
|
||||
LeaderScheduleResult::Event(LeaderScheduleEvent::InitLeaderschedule(
|
||||
schedule_epoch,
|
||||
LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
|
||||
current_epoch,
|
||||
slots_in_epoch,
|
||||
epoch_schedule,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -303,65 +321,68 @@ fn process_leadershedule_event(
|
|||
}
|
||||
}
|
||||
|
||||
// fn calculate_epoch_stakes(stake_map: &StakeMap) {
|
||||
// let stake_history_entry =
|
||||
fn calculate_epoch_stakes(
|
||||
stake_map: &StakeMap,
|
||||
vote_map: &VoteMap,
|
||||
current_epoch: u64,
|
||||
next_epoch: u64,
|
||||
mut stake_history: Option<&mut StakeHistory>,
|
||||
epoch_schedule: &EpochSchedule,
|
||||
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
|
||||
//update stake history with current end epoch stake values.
|
||||
let new_rate_activation_epoch =
|
||||
FeatureSet::default().new_warmup_cooldown_rate_epoch(epoch_schedule);
|
||||
let stake_history_entry =
|
||||
stake_map
|
||||
.values()
|
||||
.fold(StakeActivationStatus::default(), |acc, stake_account| {
|
||||
let delegation = stake_account.stake;
|
||||
acc + delegation.stake_activating_and_deactivating(
|
||||
current_epoch,
|
||||
stake_history.as_deref(),
|
||||
new_rate_activation_epoch,
|
||||
)
|
||||
});
|
||||
match stake_history {
|
||||
Some(ref mut stake_history) => stake_history.add(current_epoch, stake_history_entry),
|
||||
None => log::warn!("Vote stake calculus without Stake History"),
|
||||
};
|
||||
|
||||
// let stake_delegations: Vec<_> = self.stake_delegations.values().collect();
|
||||
// // Wrap up the prev epoch by adding new stake history entry for the
|
||||
// // prev epoch.
|
||||
// let stake_history_entry = thread_pool.install(|| {
|
||||
// stake_delegations
|
||||
// .par_iter()
|
||||
// .fold(StakeActivationStatus::default, |acc, stake_account| {
|
||||
// let delegation = stake_account.delegation();
|
||||
// acc + delegation.stake_activating_and_deactivating(
|
||||
// self.epoch,
|
||||
// Some(&self.stake_history),
|
||||
// new_rate_activation_epoch,
|
||||
// )
|
||||
// })
|
||||
// .reduce(StakeActivationStatus::default, Add::add)
|
||||
// });
|
||||
// self.stake_history.add(self.epoch, stake_history_entry);
|
||||
// self.epoch = next_epoch;
|
||||
// // Refresh the stake distribution of vote accounts for the next epoch,
|
||||
// // using new stake history.
|
||||
// let delegated_stakes = thread_pool.install(|| {
|
||||
// stake_delegations
|
||||
// .par_iter()
|
||||
// .fold(HashMap::default, |mut delegated_stakes, stake_account| {
|
||||
// let delegation = stake_account.delegation();
|
||||
// let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
|
||||
// *entry += delegation.stake(
|
||||
// self.epoch,
|
||||
// Some(&self.stake_history),
|
||||
// new_rate_activation_epoch,
|
||||
// );
|
||||
// delegated_stakes
|
||||
// })
|
||||
// .reduce(HashMap::default, merge)
|
||||
// });
|
||||
// self.vote_accounts = self
|
||||
// .vote_accounts
|
||||
// .iter()
|
||||
// .map(|(&vote_pubkey, vote_account)| {
|
||||
// let delegated_stake = delegated_stakes
|
||||
// .get(&vote_pubkey)
|
||||
// .copied()
|
||||
// .unwrap_or_default();
|
||||
// (vote_pubkey, (delegated_stake, vote_account.clone()))
|
||||
// })
|
||||
// .collect();
|
||||
//calculate schedule stakes.
|
||||
let delegated_stakes: HashMap<Pubkey, u64> =
|
||||
stake_map
|
||||
.values()
|
||||
.fold(HashMap::default(), |mut delegated_stakes, stake_account| {
|
||||
let delegation = stake_account.stake;
|
||||
let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
|
||||
*entry += delegation.stake(
|
||||
next_epoch,
|
||||
stake_history.as_deref(),
|
||||
new_rate_activation_epoch,
|
||||
);
|
||||
delegated_stakes
|
||||
});
|
||||
|
||||
// }
|
||||
let staked_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)> = vote_map
|
||||
.values()
|
||||
.map(|vote_account| {
|
||||
let delegated_stake = delegated_stakes
|
||||
.get(&vote_account.pubkey)
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
(vote_account.pubkey, (delegated_stake, vote_account.clone()))
|
||||
})
|
||||
.collect();
|
||||
staked_vote_map
|
||||
}
|
||||
|
||||
fn calculate_leader_schedule_from_stake_map(
|
||||
stake_map: &crate::stakestore::StakeMap,
|
||||
vote_map: &crate::votestore::VoteMap,
|
||||
current_epoch_info: &EpochInfo,
|
||||
fn calculate_epoch_stakes_old_algo(
|
||||
stake_map: &StakeMap,
|
||||
vote_map: &VoteMap,
|
||||
next_epoch: &Epoch,
|
||||
stake_history: Option<&StakeHistory>,
|
||||
) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
|
||||
let mut stakes = HashMap::<Pubkey, u64>::new();
|
||||
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
|
||||
let mut stakes = HashMap::<Pubkey, (u64, Arc<StoredVote>)>::new();
|
||||
log::info!(
|
||||
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
|
||||
vote_map.len(),
|
||||
|
@ -369,7 +390,7 @@ fn calculate_leader_schedule_from_stake_map(
|
|||
stake_history.as_ref().map(|h| h.len())
|
||||
);
|
||||
|
||||
let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch;
|
||||
let ten_epoch_slot_long = 10 * next_epoch.slots_in_epoch;
|
||||
|
||||
//log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
|
||||
for storestake in stake_map.values() {
|
||||
|
@ -392,47 +413,149 @@ fn calculate_leader_schedule_from_stake_map(
|
|||
"leader schedule vote:{} with None root_slot, add it",
|
||||
vote_account.pubkey
|
||||
);
|
||||
current_epoch_info.absolute_slot
|
||||
}) < current_epoch_info
|
||||
.absolute_slot
|
||||
.saturating_sub(ten_epoch_slot_long)
|
||||
next_epoch.absolute_slot
|
||||
}) < next_epoch.absolute_slot.saturating_sub(ten_epoch_slot_long)
|
||||
{
|
||||
log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
|
||||
, storestake.stake.voter_pubkey
|
||||
,vote_account.vote_data.node_pubkey
|
||||
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||
, storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)),
|
||||
, storestake.stake.stake(next_epoch.epoch, stake_history, Some(0)),
|
||||
);
|
||||
} else {
|
||||
let effective_stake = storestake
|
||||
.stake
|
||||
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||
.stake(current_epoch_info.epoch, stake_history, Some(0));
|
||||
//only vote account with positive stake are use for the schedule.
|
||||
if effective_stake > 0 {
|
||||
*(stakes
|
||||
.entry(vote_account.vote_data.node_pubkey)
|
||||
.or_insert(0)) += effective_stake;
|
||||
} else {
|
||||
log::info!(
|
||||
"leader schedule vote:{} with 0 effective vote",
|
||||
storestake.stake.voter_pubkey
|
||||
);
|
||||
.stake(next_epoch.epoch, stake_history, Some(0));
|
||||
|
||||
stakes
|
||||
.entry(vote_account.pubkey)
|
||||
.or_insert((0, vote_account.clone()))
|
||||
.0 += effective_stake;
|
||||
|
||||
// //only vote account with positive stake are use for the schedule.
|
||||
// if effective_stake > 0 {
|
||||
// *(stakes
|
||||
// .entry(vote_account.vote_data.node_pubkey)
|
||||
// .or_insert(0)) += effective_stake;
|
||||
// } else {
|
||||
// log::info!(
|
||||
// "leader schedule vote:{} with 0 effective vote",
|
||||
// storestake.stake.voter_pubkey
|
||||
// );
|
||||
// }
|
||||
}
|
||||
}
|
||||
stakes
|
||||
}
|
||||
|
||||
let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
|
||||
schedule_stakes.extend(stakes.drain());
|
||||
//Copied from leader_schedule_utils.rs
|
||||
// Mostly cribbed from leader_schedule_utils
|
||||
fn calculate_leader_schedule(
|
||||
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
|
||||
epoch: u64,
|
||||
slots_in_epoch: u64,
|
||||
) -> HashMap<String, Vec<usize>> {
|
||||
let mut stakes: Vec<(Pubkey, u64)> = stake_vote_map
|
||||
.iter()
|
||||
.filter_map(|(pk, (stake, _))| (*stake > 0).then_some((*pk, *stake)))
|
||||
.collect();
|
||||
|
||||
let leader_schedule = calculate_leader_schedule(
|
||||
&mut schedule_stakes,
|
||||
current_epoch_info.epoch,
|
||||
current_epoch_info.slots_in_epoch,
|
||||
)?;
|
||||
Ok((leader_schedule, schedule_stakes))
|
||||
let mut seed = [0u8; 32];
|
||||
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
|
||||
sort_stakes(&mut stakes);
|
||||
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
|
||||
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
|
||||
|
||||
let slot_schedule = schedule
|
||||
.get_slot_leaders()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, pk)| (pk.to_string(), i))
|
||||
.into_group_map()
|
||||
.into_iter()
|
||||
.collect();
|
||||
slot_schedule
|
||||
}
|
||||
|
||||
// fn calculate_leader_schedule_from_stake_map(
|
||||
// stake_map: &crate::stakestore::StakeMap,
|
||||
// vote_map: &crate::votestore::VoteMap,
|
||||
// current_epoch_info: &EpochInfo,
|
||||
// stake_history: Option<&StakeHistory>,
|
||||
// ) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
|
||||
// let mut stakes = HashMap::<Pubkey, u64>::new();
|
||||
// log::info!(
|
||||
// "calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
|
||||
// vote_map.len(),
|
||||
// stake_map.len(),
|
||||
// stake_history.as_ref().map(|h| h.len())
|
||||
// );
|
||||
|
||||
// let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch;
|
||||
|
||||
// //log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
|
||||
// for storestake in stake_map.values() {
|
||||
// //get nodeid for vote account
|
||||
// let Some(vote_account) = vote_map.get(&storestake.stake.voter_pubkey) else {
|
||||
// log::info!(
|
||||
// "Vote account not found in vote map for stake vote account:{}",
|
||||
// &storestake.stake.voter_pubkey
|
||||
// );
|
||||
// continue;
|
||||
// };
|
||||
// //TODO validate the number of epoch.
|
||||
// //remove vote account that hasn't vote since 10 epoch.
|
||||
// //on testnet the vote account CY7gjryUPV6Pwbsn4aArkMBL7HSaRHB8sPZUvhw558Tm node_id:6YpwLjgXcMWAj29govWQr87kaAGKS7CnoqWsEDJE4h8P
|
||||
// //hasn't vote since a long time but still return on RPC call get_voteaccounts.
|
||||
// //the validator don't use it for leader schedule.
|
||||
// if vote_account.vote_data.root_slot.unwrap_or_else(|| {
|
||||
// //vote with no root_slot are added. They have just been activated and can have active stake- TODO TO TEST.
|
||||
// log::info!(
|
||||
// "leader schedule vote:{} with None root_slot, add it",
|
||||
// vote_account.pubkey
|
||||
// );
|
||||
// current_epoch_info.absolute_slot
|
||||
// }) < current_epoch_info
|
||||
// .absolute_slot
|
||||
// .saturating_sub(ten_epoch_slot_long)
|
||||
// {
|
||||
// log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
|
||||
// , storestake.stake.voter_pubkey
|
||||
// ,vote_account.vote_data.node_pubkey
|
||||
// //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||
// , storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)),
|
||||
// );
|
||||
// } else {
|
||||
// let effective_stake = storestake
|
||||
// .stake
|
||||
// //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||
// .stake(current_epoch_info.epoch, stake_history, Some(0));
|
||||
// //only vote account with positive stake are use for the schedule.
|
||||
// if effective_stake > 0 {
|
||||
// *(stakes
|
||||
// .entry(vote_account.vote_data.node_pubkey)
|
||||
// .or_insert(0)) += effective_stake;
|
||||
// } else {
|
||||
// log::info!(
|
||||
// "leader schedule vote:{} with 0 effective vote",
|
||||
// storestake.stake.voter_pubkey
|
||||
// );
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
|
||||
// schedule_stakes.extend(stakes.drain());
|
||||
|
||||
// let leader_schedule = calculate_leader_schedule_old(
|
||||
// &mut schedule_stakes,
|
||||
// current_epoch_info.epoch,
|
||||
// current_epoch_info.slots_in_epoch,
|
||||
// )?;
|
||||
// Ok((leader_schedule, schedule_stakes))
|
||||
// }
|
||||
|
||||
// fn is_stake_to_add(
|
||||
// stake_pubkey: Pubkey,
|
||||
// stake: &Delegation,
|
||||
|
@ -461,30 +584,30 @@ fn calculate_leader_schedule_from_stake_map(
|
|||
|
||||
//Copied from leader_schedule_utils.rs
|
||||
// Mostly cribbed from leader_schedule_utils
|
||||
fn calculate_leader_schedule(
|
||||
stakes: &mut Vec<(Pubkey, u64)>,
|
||||
epoch: u64,
|
||||
slots_in_epoch: u64,
|
||||
) -> anyhow::Result<HashMap<String, Vec<usize>>> {
|
||||
if stakes.is_empty() {
|
||||
bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
|
||||
}
|
||||
let mut seed = [0u8; 32];
|
||||
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
|
||||
sort_stakes(stakes);
|
||||
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}");
|
||||
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
|
||||
// fn calculate_leader_schedule_old(
|
||||
// stakes: &mut Vec<(Pubkey, u64)>,
|
||||
// epoch: u64,
|
||||
// slots_in_epoch: u64,
|
||||
// ) -> anyhow::Result<HashMap<String, Vec<usize>>> {
|
||||
// if stakes.is_empty() {
|
||||
// bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
|
||||
// }
|
||||
// let mut seed = [0u8; 32];
|
||||
// seed[0..8].copy_from_slice(&epoch.to_le_bytes());
|
||||
// sort_stakes(stakes);
|
||||
// log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}");
|
||||
// let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
|
||||
|
||||
let slot_schedule = schedule
|
||||
.get_slot_leaders()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, pk)| (pk.to_string(), i))
|
||||
.into_group_map()
|
||||
.into_iter()
|
||||
.collect();
|
||||
Ok(slot_schedule)
|
||||
}
|
||||
// let slot_schedule = schedule
|
||||
// .get_slot_leaders()
|
||||
// .iter()
|
||||
// .enumerate()
|
||||
// .map(|(i, pk)| (pk.to_string(), i))
|
||||
// .into_group_map()
|
||||
// .into_iter()
|
||||
// .collect();
|
||||
// Ok(slot_schedule)
|
||||
// }
|
||||
|
||||
// Cribbed from leader_schedule_utils
|
||||
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
|
||||
|
@ -603,7 +726,7 @@ use solana_sdk::stake::state::StakeState;
|
|||
pub fn build_current_stakes(
|
||||
stake_map: &crate::stakestore::StakeMap,
|
||||
stake_history: Option<&StakeHistory>,
|
||||
current_epoch_info: &EpochInfo,
|
||||
current_epoch: u64,
|
||||
rpc_url: String,
|
||||
commitment: CommitmentConfig,
|
||||
) -> BTreeMap<String, (u64, u64)> {
|
||||
|
@ -628,10 +751,9 @@ pub fn build_current_stakes(
|
|||
match BorshDeserialize::deserialize(&mut account.data.as_slice()).unwrap() {
|
||||
StakeState::Stake(_, stake) => {
|
||||
//vote account version
|
||||
let effective_stake =
|
||||
stake
|
||||
let effective_stake = stake
|
||||
.delegation
|
||||
.stake(current_epoch_info.epoch, stake_history, Some(0));
|
||||
.stake(current_epoch, stake_history, Some(0));
|
||||
if effective_stake > 0 {
|
||||
// Add the stake in this stake account to the total for the delegated-to vote account
|
||||
log::trace!("RPC Stake {pubkey} account:{account:?} stake:{stake:?}");
|
||||
|
@ -657,10 +779,7 @@ pub fn build_current_stakes(
|
|||
stake_map
|
||||
.iter()
|
||||
.filter_map(|(pubkey, stake)| {
|
||||
let effective_stake =
|
||||
stake
|
||||
.stake
|
||||
.stake(current_epoch_info.epoch, stake_history, Some(0));
|
||||
let effective_stake = stake.stake.stake(current_epoch, stake_history, Some(0));
|
||||
(effective_stake > 0).then(|| (pubkey, stake, effective_stake))
|
||||
})
|
||||
.for_each(|(pubkey, stake, effective_stake)| {
|
||||
|
|
|
@ -48,7 +48,6 @@ use solana_sdk::stake::state::Delegation;
|
|||
use solana_sdk::vote::state::VoteState;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Duration;
|
||||
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||
use yellowstone_grpc_proto::geyser::CommitmentLevel;
|
||||
|
@ -169,7 +168,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
let mut leader_schedule_data = match crate::leader_schedule::bootstrap_leader_schedule(
|
||||
&schedule_current_file,
|
||||
&schedule_next_file,
|
||||
current_epoch_state.current_epoch.slots_in_epoch,
|
||||
¤t_epoch_state,
|
||||
) {
|
||||
Ok(data) => data,
|
||||
Err(err) => {
|
||||
|
@ -246,8 +245,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
//Init bootstrap process
|
||||
let mut bootstrap_data = BootstrapData {
|
||||
done: false,
|
||||
current_epoch: current_epoch_state.current_epoch.epoch,
|
||||
next_epoch_start_slot: current_epoch_state.next_epoch_start_slot,
|
||||
sleep_time: 1,
|
||||
rpc_url: RPC_URL.to_string(),
|
||||
};
|
||||
|
@ -261,7 +258,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
|
||||
//TODO remove. Store parent hash to see if we don't miss a block.
|
||||
let mut parent_block_slot = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(req) = request_rx.recv() => {
|
||||
|
@ -270,13 +266,13 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
tokio::task::spawn_blocking({
|
||||
log::info!("RPC start save_stakes");
|
||||
let current_stakes = stakestore.get_cloned_stake_map();
|
||||
let history = stakestore.get_cloned_stake_history();
|
||||
let move_epoch = current_epoch_state.current_epoch.clone();
|
||||
let move_epoch = current_epoch_state.get_current_epoch();
|
||||
let stake_history = stakestore.get_stake_history();
|
||||
move || {
|
||||
let current_stake = crate::leader_schedule::build_current_stakes(
|
||||
¤t_stakes,
|
||||
history.as_ref(),
|
||||
&move_epoch,
|
||||
stake_history.as_ref(),
|
||||
move_epoch.epoch,
|
||||
RPC_URL.to_string(),
|
||||
CommitmentConfig::confirmed(),
|
||||
);
|
||||
|
@ -332,45 +328,15 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
}
|
||||
//Manage RPC call result execution
|
||||
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
|
||||
if let Some((new_schedule, epoch)) = crate::leader_schedule::run_leader_schedule_events(
|
||||
let new_leader_schedule = crate::leader_schedule::run_leader_schedule_events(
|
||||
RPC_URL.to_string(),
|
||||
event,
|
||||
&mut spawned_leader_schedule_task,
|
||||
&mut stakestore,
|
||||
&mut votestore,
|
||||
) {
|
||||
|
||||
);
|
||||
leader_schedule_data.current = leader_schedule_data.next.take();
|
||||
match new_schedule {
|
||||
Ok((new_schedule, new_stakes)) => {
|
||||
leader_schedule_data.next = Some(LeaderScheduleData {
|
||||
schedule: Arc::new(new_schedule),
|
||||
vote_stakes: new_stakes,
|
||||
epoch: epoch.epoch,
|
||||
});
|
||||
},
|
||||
Err(err) => {
|
||||
log::error!("Error during new leader schedule generation:{err}");
|
||||
log::error!("No leader schedule available for next epoch.");
|
||||
}
|
||||
}
|
||||
|
||||
//TODO remove verification when schedule ok.
|
||||
//verify calculated shedule with the one the RPC return.
|
||||
// if let Some(schedule) = new_schedule {
|
||||
// tokio::task::spawn_blocking(|| {
|
||||
// //10 second that the schedule has been calculated on the validator
|
||||
// std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
// log::info!("Start Verify schedule");
|
||||
// if let Err(err) = crate::leader_schedule::verify_schedule(schedule,RPC_URL.to_string()) {
|
||||
// log::warn!("Error during schedule verification:{err}");
|
||||
// }
|
||||
// log::info!("End Verify schedule");
|
||||
// });
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
leader_schedule_data.next = new_leader_schedule;
|
||||
}
|
||||
|
||||
//get confirmed slot or account
|
||||
|
@ -392,7 +358,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
|||
if let Err(err) = stakestore.notify_stake_change(
|
||||
account,
|
||||
current_epoch_state.current_epoch_end_slot(),
|
||||
current_epoch_state.current_epoch.epoch,
|
||||
) {
|
||||
log::warn!("Can't add new stake from account data err:{}", err);
|
||||
continue;
|
||||
|
|
|
@ -22,7 +22,7 @@ use thiserror::Error;
|
|||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3000";
|
||||
const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3001";
|
||||
const SERVER_ERROR_MSG: &str = "Internal server error";
|
||||
|
||||
//internal RPC access
|
||||
|
@ -88,7 +88,10 @@ pub struct RPCServer {
|
|||
|
||||
#[jsonrpsee::core::async_trait]
|
||||
impl ConsensusRpcServer for RPCServer {
|
||||
async fn get_latest_blockhash(&self, config: Option<RpcContextConfig>) -> Result<RpcBlockhash> {
|
||||
async fn get_latest_blockhash(
|
||||
&self,
|
||||
_config: Option<RpcContextConfig>,
|
||||
) -> Result<RpcBlockhash> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
|
@ -137,7 +140,11 @@ pub fn server_rpc_request(
|
|||
) {
|
||||
match request {
|
||||
crate::rpc::Requests::EpochInfo(tx) => {
|
||||
if let Err(err) = tx.send(current_epoch_state.current_epoch.clone()) {
|
||||
if let Err(err) = tx.send(
|
||||
current_epoch_state
|
||||
.get_current_epoch()
|
||||
.into_epoch_info(0, None),
|
||||
) {
|
||||
log::warn!("Channel error during sending back request status error:{err:?}");
|
||||
}
|
||||
}
|
||||
|
@ -158,11 +165,11 @@ pub fn server_rpc_request(
|
|||
|
||||
log::info!(
|
||||
"Requests::LeaderSchedule slot:{slot} epoch:{epoch} current epoch:{}",
|
||||
current_epoch_state.current_epoch.epoch
|
||||
current_epoch_state.get_current_epoch().epoch
|
||||
);
|
||||
//currently only return schedule for current of next epoch.
|
||||
let get_schedule_fn = |schedule: &LeaderScheduleData| {
|
||||
(schedule.epoch == epoch).then(|| schedule.schedule.clone()) //Arc clone.
|
||||
(schedule.epoch == epoch).then(|| schedule.schedule.clone()) //clone the schedule. TODO clone arc.
|
||||
};
|
||||
let schedule = leader_schedules
|
||||
.current
|
||||
|
@ -285,7 +292,7 @@ pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHan
|
|||
Err("Error stake store extracted".to_string())
|
||||
} else {
|
||||
//replace pubkey with String. Json only allow distionary key with string.
|
||||
let ret: HashMap<String, StoredVote> = accounts
|
||||
let ret: HashMap<String, Arc<StoredVote>> = accounts
|
||||
.into_iter()
|
||||
.map(|(pk, acc)| (pk.to_string(), acc))
|
||||
.collect();
|
||||
|
|
|
@ -12,13 +12,14 @@ use solana_sdk::stake::state::Delegation;
|
|||
use solana_sdk::stake::state::StakeState;
|
||||
use solana_sdk::stake_history::StakeHistory;
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction;
|
||||
|
||||
pub type StakeMap = HashMap<Pubkey, StoredStake>;
|
||||
|
||||
pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMap> {
|
||||
pub fn extract_stakestore(
|
||||
stakestore: &mut StakeStore,
|
||||
) -> anyhow::Result<(StakeMap, Option<StakeHistory>)> {
|
||||
let new_store = std::mem::take(stakestore);
|
||||
let (new_store, stake_map) = new_store.extract()?;
|
||||
*stakestore = new_store;
|
||||
|
@ -28,21 +29,15 @@ pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMa
|
|||
pub fn merge_stakestore(
|
||||
stakestore: &mut StakeStore,
|
||||
stake_map: StakeMap,
|
||||
current_epoch: u64,
|
||||
stake_history: Option<StakeHistory>,
|
||||
) -> anyhow::Result<()> {
|
||||
let new_store = std::mem::take(stakestore);
|
||||
let new_store = new_store.merge_stakes(stake_map, current_epoch)?;
|
||||
let new_store = new_store.merge_stakes(stake_map, stake_history)?;
|
||||
*stakestore = new_store;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch: u64) {
|
||||
//don't add stake that are already desactivated.
|
||||
//there's some stran,ge stake that has activeate_epock = max epoch and desactivate ecpock 90 that are taken into account.
|
||||
//Must be better defined.
|
||||
// if stake.stake.deactivation_epoch < current_epoch {
|
||||
// return;
|
||||
// }
|
||||
fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake) {
|
||||
//log::info!("stake_map_notify_stake stake:{stake:?}");
|
||||
match map.entry(stake.pubkey) {
|
||||
// If value already exists, then increment it by one
|
||||
|
@ -70,7 +65,6 @@ fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch:
|
|||
pub enum ExtractedAction {
|
||||
Notify {
|
||||
stake: StoredStake,
|
||||
current_epoch: u64,
|
||||
},
|
||||
Remove(Pubkey, Slot),
|
||||
Merge {
|
||||
|
@ -85,18 +79,12 @@ pub enum ExtractedAction {
|
|||
impl ExtractedAction {
|
||||
fn get_update_slot(&self) -> u64 {
|
||||
match self {
|
||||
ExtractedAction::Notify { stake, .. } => stake.last_update_slot,
|
||||
ExtractedAction::Notify { stake } => stake.last_update_slot,
|
||||
ExtractedAction::Remove(_, slot) => *slot,
|
||||
ExtractedAction::Merge { update_slot, .. } => *update_slot,
|
||||
ExtractedAction::None => 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn update_epoch(&mut self, epoch: u64) {
|
||||
if let ExtractedAction::Notify { current_epoch, .. } = self {
|
||||
*current_epoch = epoch;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
|
@ -108,16 +96,6 @@ pub struct StoredStake {
|
|||
pub write_version: u64,
|
||||
}
|
||||
|
||||
// impl StoredStake {
|
||||
// fn is_removed(&self, current_epoch: u64) -> bool {
|
||||
// self.stake.activation_epoch != crate::leader_schedule::MAX_EPOCH_VALUE
|
||||
// && self.stake.deactivation_epoch < current_epoch
|
||||
// }
|
||||
// fn is_inserted(&self, current_epoch: u64) -> bool {
|
||||
// self.stake.activation_epoch == crate::leader_schedule::MAX_EPOCH_VALUE
|
||||
// || self.stake.deactivation_epoch >= current_epoch
|
||||
// }
|
||||
// }
|
||||
#[derive(Debug, Default)]
|
||||
pub struct StakeStore {
|
||||
stakes: StakeMap,
|
||||
|
@ -136,8 +114,12 @@ impl StakeStore {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn set_stake_history(&mut self, stake_history: StakeHistory) {
|
||||
self.stake_history = Some(stake_history);
|
||||
// pub fn set_stake_history(&mut self, stake_history: StakeHistory) {
|
||||
// self.stake_history = Some(Arc::new(stake_history));
|
||||
// }
|
||||
|
||||
pub fn get_stake_history(&self) -> Option<StakeHistory> {
|
||||
self.stake_history.clone()
|
||||
}
|
||||
|
||||
pub fn nb_stake_account(&self) -> usize {
|
||||
|
@ -147,48 +129,34 @@ impl StakeStore {
|
|||
pub fn get_cloned_stake_map(&self) -> StakeMap {
|
||||
self.stakes.clone()
|
||||
}
|
||||
pub fn get_cloned_stake_history(&self) -> Option<StakeHistory> {
|
||||
self.stake_history.clone()
|
||||
}
|
||||
|
||||
pub fn notify_stake_change(
|
||||
&mut self,
|
||||
new_account: AccountPretty,
|
||||
account: AccountPretty,
|
||||
current_end_epoch_slot: Slot,
|
||||
current_epoch: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let Ok(delegated_stake_opt) = new_account.read_stake() else {
|
||||
//if lamport == 0 the account has been removed.
|
||||
if account.lamports == 0 {
|
||||
self.notify_stake_action(
|
||||
ExtractedAction::Remove(account.pubkey, account.slot),
|
||||
current_end_epoch_slot,
|
||||
);
|
||||
} else {
|
||||
let Ok(delegated_stake_opt) = account.read_stake() else {
|
||||
bail!("Can't read stake from account data");
|
||||
};
|
||||
|
||||
if let Some(delegated_stake) = delegated_stake_opt {
|
||||
let stake = StoredStake {
|
||||
pubkey: new_account.pubkey,
|
||||
lamports: new_account.lamports,
|
||||
pubkey: account.pubkey,
|
||||
lamports: account.lamports,
|
||||
stake: delegated_stake,
|
||||
last_update_slot: new_account.slot,
|
||||
write_version: new_account.write_version,
|
||||
last_update_slot: account.slot,
|
||||
write_version: account.write_version,
|
||||
};
|
||||
|
||||
self.notify_stake_action(
|
||||
ExtractedAction::Notify {
|
||||
stake,
|
||||
current_epoch,
|
||||
},
|
||||
current_end_epoch_slot,
|
||||
);
|
||||
|
||||
//during extract push the new update or
|
||||
//don't insertnow account change that has been done in next epoch.
|
||||
//put in update pool to be merged next epoch change.
|
||||
// let insert_stake = !self.extracted || ststake.last_update_slot > current_end_epoch_slot;
|
||||
// match insert_stake {
|
||||
// false => self.updates.push(ExtractedAction::Notify {
|
||||
// stake_account: new_account.pubkey,
|
||||
// stake: ststake,
|
||||
// }),
|
||||
// true => self.notify_stake(new_account.pubkey, ststake, current_epoch),
|
||||
// }
|
||||
self.notify_stake_action(ExtractedAction::Notify { stake }, current_end_epoch_slot);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -206,11 +174,8 @@ impl StakeStore {
|
|||
|
||||
fn process_stake_action(&mut self, action: ExtractedAction) {
|
||||
match action {
|
||||
ExtractedAction::Notify {
|
||||
stake,
|
||||
current_epoch,
|
||||
} => {
|
||||
stake_map_notify_stake(&mut self.stakes, stake, current_epoch);
|
||||
ExtractedAction::Notify { stake } => {
|
||||
stake_map_notify_stake(&mut self.stakes, stake);
|
||||
}
|
||||
ExtractedAction::Remove(account_pk, slot) => self.remove_from_store(&account_pk, slot),
|
||||
//not use currently. TODO remove.
|
||||
|
@ -229,27 +194,31 @@ impl StakeStore {
|
|||
//return the contained stake map to do an external update.
|
||||
// During extract period (between extract and merge) added stake a stored to be processed later.
|
||||
//if the store is already extracted return an error.
|
||||
fn extract(self) -> anyhow::Result<(Self, StakeMap)> {
|
||||
fn extract(self) -> anyhow::Result<(Self, (StakeMap, Option<StakeHistory>))> {
|
||||
if self.extracted {
|
||||
bail!("StakeStore already extracted. Try later");
|
||||
}
|
||||
let stakestore = StakeStore {
|
||||
stakes: HashMap::new(),
|
||||
stake_history: self.stake_history,
|
||||
stake_history: None,
|
||||
updates: self.updates,
|
||||
extracted: true,
|
||||
};
|
||||
Ok((stakestore, self.stakes))
|
||||
Ok((stakestore, (self.stakes, self.stake_history)))
|
||||
}
|
||||
|
||||
//Merge the stake map an,d replace the current one.
|
||||
fn merge_stakes(mut self, stakes: StakeMap, current_epoch: u64) -> anyhow::Result<Self> {
|
||||
fn merge_stakes(
|
||||
mut self,
|
||||
stakes: StakeMap,
|
||||
stake_history: Option<StakeHistory>,
|
||||
) -> anyhow::Result<Self> {
|
||||
if !self.extracted {
|
||||
bail!("StakeStore merge of non extracted map. Try later");
|
||||
}
|
||||
let mut stakestore = StakeStore {
|
||||
stakes,
|
||||
stake_history: self.stake_history,
|
||||
stake_history,
|
||||
updates: vec![],
|
||||
extracted: false,
|
||||
};
|
||||
|
@ -257,25 +226,9 @@ impl StakeStore {
|
|||
self.updates
|
||||
.sort_unstable_by(|a, b| a.get_update_slot().cmp(&b.get_update_slot()));
|
||||
//apply stake added during extraction.
|
||||
for mut action in self.updates {
|
||||
action.update_epoch(current_epoch);
|
||||
for action in self.updates {
|
||||
stakestore.process_stake_action(action);
|
||||
}
|
||||
|
||||
//verify one stake account to test. TODO remove
|
||||
let stake_account = stakestore
|
||||
.stakes
|
||||
.get(&Pubkey::from_str("2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce").unwrap());
|
||||
let stake = stake_account.map(|stake| {
|
||||
stake
|
||||
.stake
|
||||
.stake(current_epoch, stakestore.stake_history.as_ref(), Some(0))
|
||||
});
|
||||
log::info!(
|
||||
"merge_stakes 2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce:{:?}",
|
||||
stake
|
||||
);
|
||||
|
||||
Ok(stakestore)
|
||||
}
|
||||
|
||||
|
@ -286,7 +239,7 @@ impl StakeStore {
|
|||
.map(|stake| stake.last_update_slot <= update_slot)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
log::info!("remove_from_store for {}", account_pk.to_string());
|
||||
log::info!("Stake remove_from_store for {}", account_pk.to_string());
|
||||
self.stakes.remove(account_pk);
|
||||
}
|
||||
}
|
||||
|
@ -296,7 +249,6 @@ pub fn merge_program_account_in_strake_map(
|
|||
stake_map: &mut StakeMap,
|
||||
stakes_list: Vec<(Pubkey, Account)>,
|
||||
last_update_slot: Slot,
|
||||
current_epoch: u64,
|
||||
) {
|
||||
stakes_list
|
||||
.into_iter()
|
||||
|
@ -318,7 +270,7 @@ pub fn merge_program_account_in_strake_map(
|
|||
write_version: 0,
|
||||
};
|
||||
|
||||
stake_map_notify_stake(stake_map, stake, current_epoch);
|
||||
stake_map_notify_stake(stake_map, stake);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -764,21 +716,21 @@ pub async fn process_stake_tx_message(
|
|||
current_end_epoch_slot,
|
||||
);
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Merge Destination",
|
||||
account_keys[instruction.accounts[0] as usize],
|
||||
)
|
||||
.await;
|
||||
// send_verification(
|
||||
// stake_sender,
|
||||
// stakestore,
|
||||
// "Merge Destination",
|
||||
// account_keys[instruction.accounts[0] as usize],
|
||||
// )
|
||||
// .await;
|
||||
|
||||
send_verification(
|
||||
stake_sender,
|
||||
stakestore,
|
||||
"Merge Source",
|
||||
account_keys[instruction.accounts[1] as usize],
|
||||
)
|
||||
.await;
|
||||
// send_verification(
|
||||
// stake_sender,
|
||||
// stakestore,
|
||||
// "Merge Source",
|
||||
// account_keys[instruction.accounts[1] as usize],
|
||||
// )
|
||||
// .await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,9 @@ use solana_sdk::account::Account;
|
|||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::vote::state::VoteState;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub type VoteMap = HashMap<Pubkey, StoredVote>;
|
||||
pub type VoteMap = HashMap<Pubkey, Arc<StoredVote>>;
|
||||
|
||||
//Copy the solana_rpc_client_api::response::RpcVoteAccountInfo struct
|
||||
//to avoid to add a dep to solana_rpc_client that create
|
||||
|
@ -104,6 +105,9 @@ impl VoteStore {
|
|||
new_account: AccountPretty,
|
||||
current_end_epoch_slot: Slot,
|
||||
) -> anyhow::Result<()> {
|
||||
if new_account.lamports == 0 {
|
||||
self.remove_from_store(&new_account.pubkey, new_account.slot);
|
||||
} else {
|
||||
let Ok(vote_data) = new_account.read_vote() else {
|
||||
bail!("Can't read Vote from account data");
|
||||
};
|
||||
|
@ -120,16 +124,31 @@ impl VoteStore {
|
|||
//during extract push the new update or
|
||||
//don't insertnow account change that has been done in next epoch.
|
||||
//put in update pool to be merged next epoch change.
|
||||
let insert_stake = !self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
|
||||
let insert_stake =
|
||||
!self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
|
||||
match insert_stake {
|
||||
false => self.updates.push((new_account.pubkey, new_voteacc)),
|
||||
true => self.insert_vote(new_account.pubkey, new_voteacc),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) {
|
||||
vote_map_insert_vote(&mut self.votes, vote_account, vote_data);
|
||||
}
|
||||
|
||||
fn remove_from_store(&mut self, account_pk: &Pubkey, update_slot: Slot) {
|
||||
if self
|
||||
.votes
|
||||
.get(account_pk)
|
||||
.map(|vote| vote.last_update_slot <= update_slot)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
log::info!("Vote remove_from_store for {}", account_pk.to_string());
|
||||
self.votes.remove(account_pk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge_program_account_in_vote_map(
|
||||
|
@ -180,7 +199,7 @@ fn vote_map_insert_vote(map: &mut VoteMap, vote_account_pk: Pubkey, vote_data: S
|
|||
);
|
||||
}
|
||||
|
||||
*voteacc = vote_data;
|
||||
*voteacc = Arc::new(vote_data);
|
||||
}
|
||||
}
|
||||
// If value doesn't exist yet, then insert a new value of 1
|
||||
|
@ -190,7 +209,7 @@ fn vote_map_insert_vote(map: &mut VoteMap, vote_account_pk: Pubkey, vote_data: S
|
|||
vote_data.vote_data.node_pubkey,
|
||||
vote_data.vote_data.root_slot,
|
||||
);
|
||||
vacant.insert(vote_data);
|
||||
vacant.insert(Arc::new(vote_data));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue