Compare commits
3 Commits
2dc96a7a50
...
1c6aa5fe8b
Author | SHA1 | Date |
---|---|---|
|
1c6aa5fe8b | |
|
f91965e1dd | |
|
db8a3079cb |
|
@ -1,12 +1,12 @@
|
||||||
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
|
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
|
||||||
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
|
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
|
||||||
use crate::Slot;
|
|
||||||
use futures_util::stream::FuturesUnordered;
|
use futures_util::stream::FuturesUnordered;
|
||||||
use solana_client::client_error::ClientError;
|
use solana_client::client_error::ClientError;
|
||||||
use solana_client::rpc_client::RpcClient;
|
use solana_client::rpc_client::RpcClient;
|
||||||
use solana_sdk::account::Account;
|
use solana_sdk::account::Account;
|
||||||
use solana_sdk::commitment_config::CommitmentConfig;
|
use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use solana_sdk::stake_history::StakeHistory;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
/*
|
/*
|
||||||
|
@ -64,7 +64,7 @@ pub enum BootstrapEvent {
|
||||||
Vec<(Pubkey, Account)>,
|
Vec<(Pubkey, Account)>,
|
||||||
Account,
|
Account,
|
||||||
),
|
),
|
||||||
AccountsMerged(StakeMap, VoteMap),
|
AccountsMerged(StakeMap, Option<StakeHistory>, VoteMap),
|
||||||
Exit,
|
Exit,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,8 +76,6 @@ enum BootsrapProcessResult {
|
||||||
|
|
||||||
pub struct BootstrapData {
|
pub struct BootstrapData {
|
||||||
pub done: bool,
|
pub done: bool,
|
||||||
pub current_epoch: u64,
|
|
||||||
pub next_epoch_start_slot: Slot,
|
|
||||||
pub sleep_time: u64,
|
pub sleep_time: u64,
|
||||||
pub rpc_url: String,
|
pub rpc_url: String,
|
||||||
}
|
}
|
||||||
|
@ -116,7 +114,7 @@ fn process_bootstrap_event(
|
||||||
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history) => {
|
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history) => {
|
||||||
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
|
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
|
||||||
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
|
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
|
||||||
(Ok(stake_map), Ok(vote_map)) => BootsrapProcessResult::Event(
|
(Ok((stake_map, _)), Ok(vote_map)) => BootsrapProcessResult::Event(
|
||||||
BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes, history),
|
BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes, history),
|
||||||
),
|
),
|
||||||
_ => {
|
_ => {
|
||||||
|
@ -131,27 +129,20 @@ fn process_bootstrap_event(
|
||||||
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes, history) => {
|
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes, history) => {
|
||||||
log::info!("BootstrapEvent::StoreExtracted RECV");
|
log::info!("BootstrapEvent::StoreExtracted RECV");
|
||||||
|
|
||||||
match crate::stakestore::read_historystake_from_account(history) {
|
let stake_history = crate::stakestore::read_historystake_from_account(history);
|
||||||
Some(stake_history) => {
|
if stake_history.is_none() {
|
||||||
log::info!(
|
//TODO return error.
|
||||||
"Read stake history done with history len:{}",
|
log::error!("Bootstrap error, can't read stake history.");
|
||||||
stake_history.len()
|
|
||||||
);
|
|
||||||
stakestore.set_stake_history(stake_history);
|
|
||||||
}
|
|
||||||
None => log::error!("Bootstrap error, can't read stake history."),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//merge new PA with stake map and vote map in a specific task
|
//merge new PA with stake map and vote map in a specific task
|
||||||
let jh = tokio::task::spawn_blocking({
|
let jh = tokio::task::spawn_blocking({
|
||||||
let current_epoch = data.current_epoch;
|
|
||||||
move || {
|
move || {
|
||||||
//update pa_list to set slot update to start epoq one.
|
//update pa_list to set slot update to start epoq one.
|
||||||
crate::stakestore::merge_program_account_in_strake_map(
|
crate::stakestore::merge_program_account_in_strake_map(
|
||||||
&mut stake_map,
|
&mut stake_map,
|
||||||
stakes,
|
stakes,
|
||||||
0, //with RPC no way to know the slot of the account update. Set to 0.
|
0, //with RPC no way to know the slot of the account update. Set to 0.
|
||||||
current_epoch,
|
|
||||||
);
|
);
|
||||||
crate::votestore::merge_program_account_in_vote_map(
|
crate::votestore::merge_program_account_in_vote_map(
|
||||||
&mut vote_map,
|
&mut vote_map,
|
||||||
|
@ -159,15 +150,15 @@ fn process_bootstrap_event(
|
||||||
0, //with RPC no way to know the slot of the account update. Set to 0.
|
0, //with RPC no way to know the slot of the account update. Set to 0.
|
||||||
);
|
);
|
||||||
|
|
||||||
BootstrapEvent::AccountsMerged(stake_map, vote_map)
|
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
BootsrapProcessResult::TaskHandle(jh)
|
BootsrapProcessResult::TaskHandle(jh)
|
||||||
}
|
}
|
||||||
BootstrapEvent::AccountsMerged(stake_map, vote_map) => {
|
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map) => {
|
||||||
log::info!("BootstrapEvent::AccountsMerged RECV");
|
log::info!("BootstrapEvent::AccountsMerged RECV");
|
||||||
match (
|
match (
|
||||||
merge_stakestore(stakestore, stake_map, data.current_epoch),
|
merge_stakestore(stakestore, stake_map, stake_history),
|
||||||
merge_votestore(votestore, vote_map),
|
merge_votestore(votestore, vote_map),
|
||||||
) {
|
) {
|
||||||
(Ok(()), Ok(())) => BootsrapProcessResult::End,
|
(Ok(()), Ok(())) => BootsrapProcessResult::End,
|
||||||
|
|
|
@ -1,132 +1,150 @@
|
||||||
use crate::leader_schedule::LeaderScheduleEvent;
|
use crate::leader_schedule::LeaderScheduleEvent;
|
||||||
use crate::Slot;
|
use crate::Slot;
|
||||||
|
use anyhow::bail;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use solana_account_decoder::parse_sysvar::SysvarAccountType;
|
use solana_account_decoder::parse_sysvar::SysvarAccountType;
|
||||||
use solana_client::client_error::ClientError;
|
|
||||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||||
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
|
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
|
||||||
use solana_sdk::epoch_info::EpochInfo;
|
use solana_sdk::epoch_info::EpochInfo;
|
||||||
|
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
|
||||||
|
use std::sync::Arc;
|
||||||
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
|
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
|
||||||
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
|
||||||
|
pub struct Epoch {
|
||||||
|
pub epoch: u64,
|
||||||
|
pub slot_index: u64,
|
||||||
|
pub slots_in_epoch: u64,
|
||||||
|
pub absolute_slot: Slot,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Epoch {
|
||||||
|
pub fn get_next_epoch(&self, current_epoch: &CurrentEpochSlotState) -> Epoch {
|
||||||
|
let last_slot = current_epoch.epoch_cache.get_last_slot_in_epoch(self.epoch);
|
||||||
|
current_epoch.epoch_cache.get_epoch_at_slot(last_slot + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_epoch_info(&self, block_height: u64, transaction_count: Option<u64>) -> EpochInfo {
|
||||||
|
EpochInfo {
|
||||||
|
epoch: self.epoch,
|
||||||
|
slot_index: self.slot_index,
|
||||||
|
slots_in_epoch: self.slots_in_epoch,
|
||||||
|
absolute_slot: self.absolute_slot,
|
||||||
|
block_height,
|
||||||
|
transaction_count,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_epoch_for_slot(slot: Slot, current_epoch: &CurrentEpochSlotState) -> u64 {
|
pub fn get_epoch_for_slot(slot: Slot, current_epoch: &CurrentEpochSlotState) -> u64 {
|
||||||
let slots_in_epoch = current_epoch.current_epoch.slots_in_epoch;
|
current_epoch.epoch_cache.get_epoch_at_slot(slot).epoch
|
||||||
let epoch_start_slot = current_epoch.current_epoch_start_slot();
|
}
|
||||||
if slot >= epoch_start_slot {
|
|
||||||
let slot_distance = (slot - epoch_start_slot) / slots_in_epoch;
|
#[derive(Clone, Debug)]
|
||||||
current_epoch.current_epoch.epoch + slot_distance
|
pub struct EpochCache {
|
||||||
} else {
|
epoch_schedule: Arc<EpochSchedule>,
|
||||||
let slot_distance = (epoch_start_slot - slot) / slots_in_epoch;
|
}
|
||||||
current_epoch.current_epoch.epoch - slot_distance
|
|
||||||
|
impl EpochCache {
|
||||||
|
pub fn get_epoch_at_slot(&self, slot: Slot) -> Epoch {
|
||||||
|
let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot);
|
||||||
|
let slots_in_epoch = self.epoch_schedule.get_slots_in_epoch(epoch);
|
||||||
|
Epoch {
|
||||||
|
epoch,
|
||||||
|
slot_index,
|
||||||
|
slots_in_epoch,
|
||||||
|
absolute_slot: slot,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pub fn get_epoch_shedule(&self) -> EpochSchedule {
|
||||||
|
// self.epoch_schedule.as_ref().clone()
|
||||||
|
// }
|
||||||
|
|
||||||
|
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
|
||||||
|
self.epoch_schedule.get_slots_in_epoch(epoch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pub fn get_first_slot_in_epoch(&self, epoch: u64) -> u64 {
|
||||||
|
// self.epoch_schedule.get_first_slot_in_epoch(epoch)
|
||||||
|
// }
|
||||||
|
|
||||||
|
pub fn get_last_slot_in_epoch(&self, epoch: u64) -> u64 {
|
||||||
|
self.epoch_schedule.get_last_slot_in_epoch(epoch)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn bootstrap_epoch(rpc_client: &RpcClient) -> anyhow::Result<EpochCache> {
|
||||||
|
let res_epoch = rpc_client
|
||||||
|
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
|
||||||
|
.await?;
|
||||||
|
let Some(SysvarAccountType::EpochSchedule(epoch_schedule)) =
|
||||||
|
bincode::deserialize(&res_epoch.data[..])
|
||||||
|
.ok()
|
||||||
|
.map(SysvarAccountType::EpochSchedule)
|
||||||
|
else {
|
||||||
|
bail!("Error during bootstrap epoch. SysvarAccountType::EpochSchedule can't be deserilized. Epoch can't be calculated.");
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(EpochCache {
|
||||||
|
epoch_schedule: Arc::new(epoch_schedule),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct CurrentEpochSlotState {
|
pub struct CurrentEpochSlotState {
|
||||||
pub current_slot: CurrentSlot,
|
pub current_slot: CurrentSlot,
|
||||||
pub current_epoch: EpochInfo,
|
epoch_cache: EpochCache,
|
||||||
pub next_epoch_start_slot: Slot,
|
current_epoch_value: Epoch,
|
||||||
pub first_epoch_slot: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CurrentEpochSlotState {
|
impl CurrentEpochSlotState {
|
||||||
pub async fn bootstrap(rpc_url: String) -> Result<CurrentEpochSlotState, ClientError> {
|
// pub fn get_epoch_shedule(&self) -> EpochSchedule {
|
||||||
|
// self.epoch_cache.get_epoch_shedule()
|
||||||
|
// }
|
||||||
|
|
||||||
|
pub fn get_current_epoch(&self) -> Epoch {
|
||||||
|
self.current_epoch_value
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
|
||||||
|
self.epoch_cache.get_slots_in_epoch(epoch)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn bootstrap(rpc_url: String) -> anyhow::Result<CurrentEpochSlotState> {
|
||||||
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized());
|
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized());
|
||||||
|
|
||||||
//get reduce_stake_warmup_cooldown feature info.
|
let epoch_cache = EpochCache::bootstrap_epoch(&rpc_client).await?;
|
||||||
//NOT AND ACCOUNT. Get from config.
|
|
||||||
// let reduce_stake_warmup_cooldown_epoch = rpc_client
|
|
||||||
// .get_account(&feature_set::reduce_stake_warmup_cooldown::id())
|
|
||||||
// .await?;
|
|
||||||
|
|
||||||
// let reduce_stake_warmup_cooldown_epoch = bincode::deserialize(&reduce_stake_warmup_cooldown_epoch.data[..])
|
|
||||||
// .ok()
|
|
||||||
// .map(SysvarAccountType::EpochSchedule);
|
|
||||||
// log::info!("reduce_stake_warmup_cooldown_epoch {reduce_stake_warmup_cooldown_epoch:?}");
|
|
||||||
|
|
||||||
//get epoch sysvar account to init epoch. More compatible with a snapshot bootstrap.
|
|
||||||
//sysvar_epoch_schedule Some(EpochSchedule(EpochSchedule { slots_per_epoch: 100, leader_schedule_slot_offset: 100, warmup: false, first_normal_epoch: 0, first_normal_slot: 0 }))
|
|
||||||
let res_epoch = rpc_client
|
|
||||||
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
|
|
||||||
.await?;
|
|
||||||
let sysvar_epoch_schedule = bincode::deserialize(&res_epoch.data[..])
|
|
||||||
.ok()
|
|
||||||
.map(SysvarAccountType::EpochSchedule);
|
|
||||||
log::info!("sysvar_epoch_schedule {sysvar_epoch_schedule:?}");
|
|
||||||
|
|
||||||
// Fetch current epoch
|
|
||||||
let current_epoch = rpc_client.get_epoch_info().await?;
|
|
||||||
let next_epoch_start_slot =
|
|
||||||
current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
|
|
||||||
log::info!("Run_loop init {next_epoch_start_slot} current_epoch:{current_epoch:?}");
|
|
||||||
Ok(CurrentEpochSlotState {
|
Ok(CurrentEpochSlotState {
|
||||||
current_slot: Default::default(),
|
current_slot: CurrentSlot::default(),
|
||||||
current_epoch,
|
epoch_cache,
|
||||||
next_epoch_start_slot,
|
current_epoch_value: Epoch::default(),
|
||||||
first_epoch_slot: false,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn current_epoch_start_slot(&self) -> Slot {
|
// pub fn current_epoch_start_slot(&self) -> Slot {
|
||||||
self.current_epoch.absolute_slot - self.current_epoch.slot_index
|
// self.epoch_cache
|
||||||
}
|
// .get_first_slot_in_epoch(self.current_slot.confirmed_slot)
|
||||||
|
// }
|
||||||
|
|
||||||
pub fn current_epoch_end_slot(&self) -> Slot {
|
pub fn current_epoch_end_slot(&self) -> Slot {
|
||||||
self.next_epoch_start_slot - 1
|
self.epoch_cache
|
||||||
|
.get_last_slot_in_epoch(self.current_epoch_value.epoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process_new_slot(
|
pub fn process_new_slot(
|
||||||
&mut self,
|
&mut self,
|
||||||
new_slot: &SubscribeUpdateSlot,
|
new_slot: &SubscribeUpdateSlot,
|
||||||
) -> Option<LeaderScheduleEvent> {
|
) -> Option<LeaderScheduleEvent> {
|
||||||
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
|
|
||||||
//for the first update of slot correct epoch info data.
|
|
||||||
if self.current_slot.confirmed_slot == 0 {
|
|
||||||
let diff = new_slot.slot - self.current_epoch.absolute_slot;
|
|
||||||
self.current_epoch.slot_index += diff;
|
|
||||||
self.current_epoch.absolute_slot = new_slot.slot;
|
|
||||||
log::trace!(
|
|
||||||
"Set current epoch with diff:{diff} slot:{} current:{:?}",
|
|
||||||
new_slot.slot,
|
|
||||||
self.current_epoch,
|
|
||||||
);
|
|
||||||
//if new slot update slot related state data.
|
|
||||||
} else if new_slot.slot > self.current_slot.confirmed_slot {
|
|
||||||
//update epoch info
|
|
||||||
let mut diff = new_slot.slot - self.current_slot.confirmed_slot;
|
|
||||||
//First epoch slot, index is 0 so remove 1 from diff.
|
|
||||||
if self.first_epoch_slot {
|
|
||||||
//calculate next epoch data
|
|
||||||
self.current_epoch.epoch += 1;
|
|
||||||
//slot can be non consecutif, use diff.
|
|
||||||
self.current_epoch.slot_index = 0;
|
|
||||||
self.current_epoch.absolute_slot = new_slot.slot;
|
|
||||||
log::info!(
|
|
||||||
"change_epoch calculated next epoch:{:?} at slot:{}",
|
|
||||||
self.current_epoch,
|
|
||||||
new_slot.slot,
|
|
||||||
);
|
|
||||||
|
|
||||||
self.first_epoch_slot = false;
|
|
||||||
//slot_index start at 0.
|
|
||||||
diff -= 1; //diff is always >= 1
|
|
||||||
self.next_epoch_start_slot =
|
|
||||||
self.next_epoch_start_slot + self.current_epoch.slots_in_epoch;
|
|
||||||
//set to next epochs.
|
|
||||||
} else {
|
|
||||||
self.current_epoch.absolute_slot = new_slot.slot;
|
|
||||||
}
|
|
||||||
self.current_epoch.slot_index += diff;
|
|
||||||
|
|
||||||
log::trace!(
|
|
||||||
"Slot epoch with slot:{} , diff:{diff} current epoch state:{self:?}",
|
|
||||||
new_slot.slot
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//update slot state for all commitment.
|
|
||||||
self.current_slot.update_slot(&new_slot);
|
self.current_slot.update_slot(&new_slot);
|
||||||
|
|
||||||
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
|
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
|
||||||
|
if self.current_epoch_value.epoch == 0 {
|
||||||
|
//init first epoch
|
||||||
|
self.current_epoch_value = self.epoch_cache.get_epoch_at_slot(new_slot.slot);
|
||||||
|
}
|
||||||
self.manage_change_epoch()
|
self.manage_change_epoch()
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
@ -134,22 +152,17 @@ impl CurrentEpochSlotState {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn manage_change_epoch(&mut self) -> Option<LeaderScheduleEvent> {
|
fn manage_change_epoch(&mut self) -> Option<LeaderScheduleEvent> {
|
||||||
//we change the epoch at the last slot of the current epoch.
|
if self.current_slot.confirmed_slot > self.current_epoch_end_slot() {
|
||||||
if self.current_slot.confirmed_slot >= self.current_epoch_end_slot() {
|
log::info!("Change epoch at slot:{}", self.current_slot.confirmed_slot);
|
||||||
log::info!(
|
|
||||||
"End epoch slot detected:{}",
|
|
||||||
self.current_slot.confirmed_slot
|
|
||||||
);
|
|
||||||
|
|
||||||
//set epoch change effectif at next slot.
|
|
||||||
self.first_epoch_slot = true;
|
|
||||||
|
|
||||||
|
self.current_epoch_value = self.get_current_epoch().get_next_epoch(&self);
|
||||||
//start leader schedule calculus
|
//start leader schedule calculus
|
||||||
//switch to 2 next epoch to calculate schedule at next epoch.
|
|
||||||
//at current epoch change the schedule is calculated for the next epoch.
|
//at current epoch change the schedule is calculated for the next epoch.
|
||||||
let schedule_epoch = crate::leader_schedule::next_schedule_epoch(&self.current_epoch);
|
Some(crate::leader_schedule::create_schedule_init_event(
|
||||||
let schedule_epoch = crate::leader_schedule::next_schedule_epoch(&schedule_epoch);
|
self.current_epoch_value.epoch,
|
||||||
Some(LeaderScheduleEvent::InitLeaderschedule(schedule_epoch))
|
self.get_slots_in_epoch(self.current_epoch_value.epoch),
|
||||||
|
Arc::clone(&self.epoch_cache.epoch_schedule),
|
||||||
|
))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,15 +1,19 @@
|
||||||
|
use crate::epoch::CurrentEpochSlotState;
|
||||||
|
use crate::epoch::Epoch;
|
||||||
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
|
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
|
||||||
|
use crate::votestore::StoredVote;
|
||||||
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
|
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
|
||||||
use anyhow::bail;
|
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use solana_client::rpc_client::RpcClient;
|
use solana_client::rpc_client::RpcClient;
|
||||||
use solana_ledger::leader_schedule::LeaderSchedule;
|
use solana_ledger::leader_schedule::LeaderSchedule;
|
||||||
|
use solana_program::sysvar::epoch_schedule::EpochSchedule;
|
||||||
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
|
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
|
||||||
use solana_sdk::commitment_config::CommitmentConfig;
|
use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
use solana_sdk::epoch_info::EpochInfo;
|
use solana_sdk::feature_set::FeatureSet;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use solana_sdk::stake::state::StakeActivationStatus;
|
||||||
use solana_sdk::stake_history::StakeHistory;
|
use solana_sdk::stake_history::StakeHistory;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
@ -34,75 +38,69 @@ pub struct CalculatedSchedule {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct LeaderScheduleData {
|
pub struct LeaderScheduleData {
|
||||||
pub schedule: Arc<HashMap<String, Vec<usize>>>,
|
pub schedule: Arc<HashMap<String, Vec<usize>>>,
|
||||||
pub vote_stakes: Vec<(Pubkey, u64)>,
|
pub vote_stakes: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
|
||||||
pub epoch: u64,
|
pub epoch: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct EpochStake {
|
pub struct EpochStake {
|
||||||
epoch: u64,
|
epoch: u64,
|
||||||
stakes: Vec<(Pubkey, u64)>,
|
stake_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<SavedStake> for EpochStake {
|
impl From<SavedStake> for EpochStake {
|
||||||
fn from(saved_stakes: SavedStake) -> Self {
|
fn from(saved_stakes: SavedStake) -> Self {
|
||||||
EpochStake {
|
EpochStake {
|
||||||
epoch: saved_stakes.epoch,
|
epoch: saved_stakes.epoch,
|
||||||
stakes: saved_stakes
|
stake_vote_map: saved_stakes
|
||||||
.stakes
|
.stake_vote_map
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), st))
|
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), (st.0, st.1)))
|
||||||
.collect(),
|
.collect(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next_schedule_epoch(current_epoch: &EpochInfo) -> EpochInfo {
|
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||||
let mut next_epoch_info = current_epoch.clone();
|
struct SavedStake {
|
||||||
next_epoch_info.epoch += 1;
|
epoch: u64,
|
||||||
next_epoch_info.slot_index = 0;
|
stake_vote_map: HashMap<String, (u64, Arc<StoredVote>)>,
|
||||||
next_epoch_info.absolute_slot =
|
|
||||||
current_epoch.absolute_slot + current_epoch.slots_in_epoch - current_epoch.slot_index;
|
|
||||||
next_epoch_info
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn bootstrap_leader_schedule(
|
pub fn bootstrap_leader_schedule(
|
||||||
current_file_patch: &str,
|
current_file_patch: &str,
|
||||||
next_file_patch: &str,
|
next_file_patch: &str,
|
||||||
slots_in_epoch: u64,
|
epoch_state: &CurrentEpochSlotState,
|
||||||
) -> anyhow::Result<CalculatedSchedule> {
|
) -> anyhow::Result<CalculatedSchedule> {
|
||||||
let mut current_stakes = read_schedule_vote_stakes(current_file_patch)?;
|
let current_stakes = read_schedule_vote_stakes(current_file_patch)?;
|
||||||
let mut next_stakes = read_schedule_vote_stakes(next_file_patch)?;
|
let next_stakes = read_schedule_vote_stakes(next_file_patch)?;
|
||||||
|
|
||||||
//calcualte leader schedule for all vote stakes.
|
//calcualte leader schedule for all vote stakes.
|
||||||
let current_schedule = calculate_leader_schedule(
|
let current_schedule = calculate_leader_schedule(
|
||||||
&mut current_stakes.stakes,
|
¤t_stakes.stake_vote_map,
|
||||||
current_stakes.epoch,
|
current_stakes.epoch,
|
||||||
slots_in_epoch,
|
epoch_state.get_slots_in_epoch(current_stakes.epoch),
|
||||||
)?;
|
);
|
||||||
let next_schedule =
|
let next_schedule = calculate_leader_schedule(
|
||||||
calculate_leader_schedule(&mut next_stakes.stakes, next_stakes.epoch, slots_in_epoch)?;
|
&next_stakes.stake_vote_map,
|
||||||
|
next_stakes.epoch,
|
||||||
|
epoch_state.get_slots_in_epoch(next_stakes.epoch),
|
||||||
|
);
|
||||||
|
|
||||||
Ok(CalculatedSchedule {
|
Ok(CalculatedSchedule {
|
||||||
current: Some(LeaderScheduleData {
|
current: Some(LeaderScheduleData {
|
||||||
schedule: Arc::new(current_schedule),
|
schedule: Arc::new(current_schedule),
|
||||||
vote_stakes: current_stakes.stakes,
|
vote_stakes: current_stakes.stake_vote_map,
|
||||||
epoch: current_stakes.epoch,
|
epoch: epoch_state.get_slots_in_epoch(current_stakes.epoch),
|
||||||
}),
|
}),
|
||||||
next: Some(LeaderScheduleData {
|
next: Some(LeaderScheduleData {
|
||||||
schedule: Arc::new(next_schedule),
|
schedule: Arc::new(next_schedule),
|
||||||
vote_stakes: next_stakes.stakes,
|
vote_stakes: next_stakes.stake_vote_map,
|
||||||
epoch: next_stakes.epoch,
|
epoch: epoch_state.get_slots_in_epoch(next_stakes.epoch),
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
|
||||||
struct SavedStake {
|
|
||||||
epoch: u64,
|
|
||||||
stakes: Vec<(String, u64)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
|
fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
|
||||||
let content = std::fs::read_to_string(file_path)?;
|
let content = std::fs::read_to_string(file_path)?;
|
||||||
let stakes_str: SavedStake = serde_json::from_str(&content)?;
|
let stakes_str: SavedStake = serde_json::from_str(&content)?;
|
||||||
|
@ -112,16 +110,16 @@ fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
|
||||||
|
|
||||||
fn save_schedule_vote_stakes(
|
fn save_schedule_vote_stakes(
|
||||||
base_file_path: &str,
|
base_file_path: &str,
|
||||||
stakes: &Vec<(Pubkey, u64)>,
|
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
|
||||||
epoch: u64,
|
epoch: u64,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
//save new schedule for restart.
|
//save new schedule for restart.
|
||||||
let filename = format!("{base_file_path}_{epoch}.json",);
|
let filename = format!("{base_file_path}_{}.json", epoch);
|
||||||
let save_stakes = SavedStake {
|
let save_stakes = SavedStake {
|
||||||
epoch,
|
epoch,
|
||||||
stakes: stakes
|
stake_vote_map: stake_vote_map
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(pk, st)| (pk.to_string(), *st))
|
.map(|(pk, st)| (pk.to_string(), (st.0, Arc::clone(&st.1))))
|
||||||
.collect(),
|
.collect(),
|
||||||
};
|
};
|
||||||
let serialized_stakes = serde_json::to_string(&save_stakes).unwrap();
|
let serialized_stakes = serde_json::to_string(&save_stakes).unwrap();
|
||||||
|
@ -157,11 +155,8 @@ pub fn run_leader_schedule_events(
|
||||||
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
|
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
|
||||||
stakestore: &mut StakeStore,
|
stakestore: &mut StakeStore,
|
||||||
votestore: &mut VoteStore,
|
votestore: &mut VoteStore,
|
||||||
) -> Option<(
|
) -> Option<LeaderScheduleData> {
|
||||||
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
|
let result = process_leadershedule_event(event, stakestore, votestore);
|
||||||
EpochInfo,
|
|
||||||
)> {
|
|
||||||
let result = process_leadershedule_event(rpc_url.clone(), event, stakestore, votestore);
|
|
||||||
match result {
|
match result {
|
||||||
LeaderScheduleResult::TaskHandle(jh) => {
|
LeaderScheduleResult::TaskHandle(jh) => {
|
||||||
bootstrap_tasks.push(jh);
|
bootstrap_tasks.push(jh);
|
||||||
|
@ -170,132 +165,155 @@ pub fn run_leader_schedule_events(
|
||||||
LeaderScheduleResult::Event(event) => {
|
LeaderScheduleResult::Event(event) => {
|
||||||
run_leader_schedule_events(rpc_url, event, bootstrap_tasks, stakestore, votestore)
|
run_leader_schedule_events(rpc_url, event, bootstrap_tasks, stakestore, votestore)
|
||||||
}
|
}
|
||||||
LeaderScheduleResult::End(schedule, epoch) => Some((schedule, epoch)),
|
LeaderScheduleResult::End(schedule) => Some(schedule),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// struct LeaderScheduleEpochData {
|
||||||
|
// current_epoch: u64,
|
||||||
|
// next_epoch: u64,
|
||||||
|
// slots_in_epoch: u64,
|
||||||
|
// epoch_schedule: Arc<EpochSchedule>,
|
||||||
|
// }
|
||||||
|
|
||||||
pub enum LeaderScheduleEvent {
|
pub enum LeaderScheduleEvent {
|
||||||
InitLeaderschedule(EpochInfo),
|
Init(u64, u64, Arc<EpochSchedule>),
|
||||||
CalculateScedule(StakeMap, VoteMap, EpochInfo, Option<StakeHistory>),
|
|
||||||
MergeStoreAndSaveSchedule(
|
MergeStoreAndSaveSchedule(
|
||||||
StakeMap,
|
StakeMap,
|
||||||
VoteMap,
|
VoteMap,
|
||||||
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
|
LeaderScheduleData,
|
||||||
EpochInfo,
|
(u64, u64, Arc<EpochSchedule>),
|
||||||
|
Option<StakeHistory>,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
enum LeaderScheduleResult {
|
enum LeaderScheduleResult {
|
||||||
TaskHandle(JoinHandle<LeaderScheduleEvent>),
|
TaskHandle(JoinHandle<LeaderScheduleEvent>),
|
||||||
Event(LeaderScheduleEvent),
|
Event(LeaderScheduleEvent),
|
||||||
End(
|
End(LeaderScheduleData),
|
||||||
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
|
}
|
||||||
EpochInfo,
|
|
||||||
),
|
pub fn create_schedule_init_event(
|
||||||
|
current_epoch: u64,
|
||||||
|
slots_in_epoch: u64,
|
||||||
|
epoch_schedule: Arc<EpochSchedule>,
|
||||||
|
) -> LeaderScheduleEvent {
|
||||||
|
log::info!("LeaderScheduleEvent::InitLeaderschedule START");
|
||||||
|
//TODO get a way to be updated of stake history.
|
||||||
|
//request the current state using RPC.
|
||||||
|
//TODO remove the await from the scheduler task.
|
||||||
|
// let before_stake_histo = stake_history.clone();
|
||||||
|
// let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
|
||||||
|
// .map(|account| crate::stakestore::read_historystake_from_account(account))
|
||||||
|
// .unwrap_or_else(|_| {
|
||||||
|
// log::error!("Error during stake history fetch. Use bootstrap one");
|
||||||
|
// stake_history.map(|st| st.as_ref().clone())
|
||||||
|
// });
|
||||||
|
// log::info!(
|
||||||
|
// "stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
|
||||||
|
// epoch_data.current_epoch
|
||||||
|
// );
|
||||||
|
LeaderScheduleEvent::Init(current_epoch, slots_in_epoch, epoch_schedule)
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO remove desactivated account after leader schedule calculus.
|
//TODO remove desactivated account after leader schedule calculus.
|
||||||
fn process_leadershedule_event(
|
fn process_leadershedule_event(
|
||||||
rpc_url: String,
|
// rpc_url: String,
|
||||||
event: LeaderScheduleEvent,
|
event: LeaderScheduleEvent,
|
||||||
stakestore: &mut StakeStore,
|
stakestore: &mut StakeStore,
|
||||||
votestore: &mut VoteStore,
|
votestore: &mut VoteStore,
|
||||||
) -> LeaderScheduleResult {
|
) -> LeaderScheduleResult {
|
||||||
match event {
|
match event {
|
||||||
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch) => {
|
LeaderScheduleEvent::Init(current_epoch, slots_in_epoch, epoch_schedule) => {
|
||||||
log::info!("LeaderScheduleEvent::InitLeaderschedule RECV");
|
|
||||||
//For test TODO put in extract and restore process to avoid to clone.
|
|
||||||
let stake_history = stakestore.get_cloned_stake_history();
|
|
||||||
//TODO get a way to be updated of stake history.
|
|
||||||
//request the current state using RPC.
|
|
||||||
//TODO remove the await from the scheduler task.
|
|
||||||
let before_stake_histo = stake_history.clone();
|
|
||||||
let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
|
|
||||||
.map(|account| crate::stakestore::read_historystake_from_account(account))
|
|
||||||
.unwrap_or_else(|_| {
|
|
||||||
log::error!("Error during stake history fetch. Use bootstrap one");
|
|
||||||
stake_history
|
|
||||||
});
|
|
||||||
log::info!(
|
|
||||||
"stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
|
|
||||||
schedule_epoch.epoch
|
|
||||||
);
|
|
||||||
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
|
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
|
||||||
(Ok(stake_map), Ok(vote_map)) => {
|
(Ok((stake_map, mut stake_history)), Ok(vote_map)) => {
|
||||||
LeaderScheduleResult::Event(LeaderScheduleEvent::CalculateScedule(
|
//For test TODO put in extract and restore process to avoid to clone.
|
||||||
stake_map,
|
log::info!("LeaderScheduleEvent::CalculateScedule");
|
||||||
vote_map,
|
let epoch_schedule = Arc::clone(&epoch_schedule);
|
||||||
schedule_epoch,
|
let jh = tokio::task::spawn_blocking({
|
||||||
stake_history,
|
move || {
|
||||||
))
|
let next_epoch = current_epoch + 1;
|
||||||
}
|
let epoch_vote_stakes = calculate_epoch_stakes(
|
||||||
_ => {
|
&stake_map,
|
||||||
log::warn!("process_leadershedule_event error during extract store");
|
&vote_map,
|
||||||
let jh = tokio::spawn(async move {
|
current_epoch,
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
next_epoch,
|
||||||
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch)
|
stake_history.as_mut(),
|
||||||
|
&epoch_schedule,
|
||||||
|
);
|
||||||
|
|
||||||
|
let leader_schedule = calculate_leader_schedule(
|
||||||
|
&epoch_vote_stakes,
|
||||||
|
next_epoch,
|
||||||
|
slots_in_epoch,
|
||||||
|
);
|
||||||
|
|
||||||
|
// let epoch_vote_stakes_old = calculate_epoch_stakes_old_algo(
|
||||||
|
// &stake_map,
|
||||||
|
// &vote_map,
|
||||||
|
// &epoch_state.next_epoch,
|
||||||
|
// stake_history.as_ref(),
|
||||||
|
// );
|
||||||
|
|
||||||
|
// let leader_schedule_old =
|
||||||
|
// calculate_leader_schedule(&epoch_vote_stakes_old, &next_epoch);
|
||||||
|
if let Err(err) = save_schedule_vote_stakes(
|
||||||
|
SCHEDULE_STAKE_BASE_FILE_NAME,
|
||||||
|
&epoch_vote_stakes,
|
||||||
|
next_epoch,
|
||||||
|
) {
|
||||||
|
log::error!(
|
||||||
|
"Error during saving in file of the new schedule of epoch:{} error:{err}",
|
||||||
|
next_epoch
|
||||||
|
);
|
||||||
|
}
|
||||||
|
log::info!("End calculate leader schedule");
|
||||||
|
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
|
||||||
|
stake_map,
|
||||||
|
vote_map,
|
||||||
|
LeaderScheduleData {
|
||||||
|
schedule: Arc::new(leader_schedule),
|
||||||
|
vote_stakes: epoch_vote_stakes,
|
||||||
|
epoch: next_epoch,
|
||||||
|
},
|
||||||
|
(current_epoch, slots_in_epoch, epoch_schedule),
|
||||||
|
stake_history,
|
||||||
|
)
|
||||||
|
}
|
||||||
});
|
});
|
||||||
LeaderScheduleResult::TaskHandle(jh)
|
LeaderScheduleResult::TaskHandle(jh)
|
||||||
}
|
}
|
||||||
}
|
_ => {
|
||||||
}
|
log::error!("Create leadershedule init event error during extract store");
|
||||||
LeaderScheduleEvent::CalculateScedule(
|
LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
|
||||||
stake_map,
|
current_epoch,
|
||||||
vote_map,
|
slots_in_epoch,
|
||||||
schedule_epoch,
|
epoch_schedule,
|
||||||
stake_history,
|
))
|
||||||
) => {
|
|
||||||
log::info!("LeaderScheduleEvent::CalculateScedule RECV");
|
|
||||||
let jh = tokio::task::spawn_blocking({
|
|
||||||
move || {
|
|
||||||
let schedule_and_stakes =
|
|
||||||
crate::leader_schedule::calculate_leader_schedule_from_stake_map(
|
|
||||||
&stake_map,
|
|
||||||
&vote_map,
|
|
||||||
&schedule_epoch,
|
|
||||||
stake_history.as_ref(),
|
|
||||||
);
|
|
||||||
if let Ok((_, vote_stakes)) = &schedule_and_stakes {
|
|
||||||
if let Err(err) = save_schedule_vote_stakes(
|
|
||||||
SCHEDULE_STAKE_BASE_FILE_NAME,
|
|
||||||
vote_stakes,
|
|
||||||
schedule_epoch.epoch,
|
|
||||||
) {
|
|
||||||
log::error!(
|
|
||||||
"Error during saving in file of the new schedule of epoch:{} error:{err}",
|
|
||||||
schedule_epoch.epoch
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log::info!("End calculate leader schedule");
|
|
||||||
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
|
|
||||||
stake_map,
|
|
||||||
vote_map,
|
|
||||||
schedule_and_stakes,
|
|
||||||
schedule_epoch,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
LeaderScheduleResult::TaskHandle(jh)
|
|
||||||
}
|
}
|
||||||
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
|
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
|
||||||
stake_map,
|
stake_map,
|
||||||
vote_map,
|
vote_map,
|
||||||
schedule,
|
schedule_data,
|
||||||
schedule_epoch,
|
(current_epoch, slots_in_epoch, epoch_schedule),
|
||||||
|
stake_history,
|
||||||
) => {
|
) => {
|
||||||
log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV");
|
log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV");
|
||||||
match (
|
match (
|
||||||
merge_stakestore(stakestore, stake_map, schedule_epoch.epoch),
|
merge_stakestore(stakestore, stake_map, stake_history),
|
||||||
merge_votestore(votestore, vote_map),
|
merge_votestore(votestore, vote_map),
|
||||||
) {
|
) {
|
||||||
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule, schedule_epoch),
|
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule_data),
|
||||||
_ => {
|
_ => {
|
||||||
//this shoud never arrive because the store has been extracted before.
|
//this shoud never arrive because the store has been extracted before.
|
||||||
//TODO remove this error using type state
|
//TODO remove this error using type state
|
||||||
log::warn!("LeaderScheduleEvent::MergeStoreAndSaveSchedule merge stake or vote fail, -restart Schedule");
|
log::warn!("LeaderScheduleEvent::MergeStoreAndSaveSchedule merge stake or vote fail, -restart Schedule");
|
||||||
LeaderScheduleResult::Event(LeaderScheduleEvent::InitLeaderschedule(
|
LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
|
||||||
schedule_epoch,
|
current_epoch,
|
||||||
|
slots_in_epoch,
|
||||||
|
epoch_schedule,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -303,65 +321,68 @@ fn process_leadershedule_event(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// fn calculate_epoch_stakes(stake_map: &StakeMap) {
|
fn calculate_epoch_stakes(
|
||||||
// let stake_history_entry =
|
stake_map: &StakeMap,
|
||||||
|
vote_map: &VoteMap,
|
||||||
|
current_epoch: u64,
|
||||||
|
next_epoch: u64,
|
||||||
|
mut stake_history: Option<&mut StakeHistory>,
|
||||||
|
epoch_schedule: &EpochSchedule,
|
||||||
|
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
|
||||||
|
//update stake history with current end epoch stake values.
|
||||||
|
let new_rate_activation_epoch =
|
||||||
|
FeatureSet::default().new_warmup_cooldown_rate_epoch(epoch_schedule);
|
||||||
|
let stake_history_entry =
|
||||||
|
stake_map
|
||||||
|
.values()
|
||||||
|
.fold(StakeActivationStatus::default(), |acc, stake_account| {
|
||||||
|
let delegation = stake_account.stake;
|
||||||
|
acc + delegation.stake_activating_and_deactivating(
|
||||||
|
current_epoch,
|
||||||
|
stake_history.as_deref(),
|
||||||
|
new_rate_activation_epoch,
|
||||||
|
)
|
||||||
|
});
|
||||||
|
match stake_history {
|
||||||
|
Some(ref mut stake_history) => stake_history.add(current_epoch, stake_history_entry),
|
||||||
|
None => log::warn!("Vote stake calculus without Stake History"),
|
||||||
|
};
|
||||||
|
|
||||||
// let stake_delegations: Vec<_> = self.stake_delegations.values().collect();
|
//calculate schedule stakes.
|
||||||
// // Wrap up the prev epoch by adding new stake history entry for the
|
let delegated_stakes: HashMap<Pubkey, u64> =
|
||||||
// // prev epoch.
|
stake_map
|
||||||
// let stake_history_entry = thread_pool.install(|| {
|
.values()
|
||||||
// stake_delegations
|
.fold(HashMap::default(), |mut delegated_stakes, stake_account| {
|
||||||
// .par_iter()
|
let delegation = stake_account.stake;
|
||||||
// .fold(StakeActivationStatus::default, |acc, stake_account| {
|
let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
|
||||||
// let delegation = stake_account.delegation();
|
*entry += delegation.stake(
|
||||||
// acc + delegation.stake_activating_and_deactivating(
|
next_epoch,
|
||||||
// self.epoch,
|
stake_history.as_deref(),
|
||||||
// Some(&self.stake_history),
|
new_rate_activation_epoch,
|
||||||
// new_rate_activation_epoch,
|
);
|
||||||
// )
|
delegated_stakes
|
||||||
// })
|
});
|
||||||
// .reduce(StakeActivationStatus::default, Add::add)
|
|
||||||
// });
|
|
||||||
// self.stake_history.add(self.epoch, stake_history_entry);
|
|
||||||
// self.epoch = next_epoch;
|
|
||||||
// // Refresh the stake distribution of vote accounts for the next epoch,
|
|
||||||
// // using new stake history.
|
|
||||||
// let delegated_stakes = thread_pool.install(|| {
|
|
||||||
// stake_delegations
|
|
||||||
// .par_iter()
|
|
||||||
// .fold(HashMap::default, |mut delegated_stakes, stake_account| {
|
|
||||||
// let delegation = stake_account.delegation();
|
|
||||||
// let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
|
|
||||||
// *entry += delegation.stake(
|
|
||||||
// self.epoch,
|
|
||||||
// Some(&self.stake_history),
|
|
||||||
// new_rate_activation_epoch,
|
|
||||||
// );
|
|
||||||
// delegated_stakes
|
|
||||||
// })
|
|
||||||
// .reduce(HashMap::default, merge)
|
|
||||||
// });
|
|
||||||
// self.vote_accounts = self
|
|
||||||
// .vote_accounts
|
|
||||||
// .iter()
|
|
||||||
// .map(|(&vote_pubkey, vote_account)| {
|
|
||||||
// let delegated_stake = delegated_stakes
|
|
||||||
// .get(&vote_pubkey)
|
|
||||||
// .copied()
|
|
||||||
// .unwrap_or_default();
|
|
||||||
// (vote_pubkey, (delegated_stake, vote_account.clone()))
|
|
||||||
// })
|
|
||||||
// .collect();
|
|
||||||
|
|
||||||
// }
|
let staked_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)> = vote_map
|
||||||
|
.values()
|
||||||
|
.map(|vote_account| {
|
||||||
|
let delegated_stake = delegated_stakes
|
||||||
|
.get(&vote_account.pubkey)
|
||||||
|
.copied()
|
||||||
|
.unwrap_or_default();
|
||||||
|
(vote_account.pubkey, (delegated_stake, vote_account.clone()))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
staked_vote_map
|
||||||
|
}
|
||||||
|
|
||||||
fn calculate_leader_schedule_from_stake_map(
|
fn calculate_epoch_stakes_old_algo(
|
||||||
stake_map: &crate::stakestore::StakeMap,
|
stake_map: &StakeMap,
|
||||||
vote_map: &crate::votestore::VoteMap,
|
vote_map: &VoteMap,
|
||||||
current_epoch_info: &EpochInfo,
|
next_epoch: &Epoch,
|
||||||
stake_history: Option<&StakeHistory>,
|
stake_history: Option<&StakeHistory>,
|
||||||
) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
|
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
|
||||||
let mut stakes = HashMap::<Pubkey, u64>::new();
|
let mut stakes = HashMap::<Pubkey, (u64, Arc<StoredVote>)>::new();
|
||||||
log::info!(
|
log::info!(
|
||||||
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
|
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
|
||||||
vote_map.len(),
|
vote_map.len(),
|
||||||
|
@ -369,7 +390,7 @@ fn calculate_leader_schedule_from_stake_map(
|
||||||
stake_history.as_ref().map(|h| h.len())
|
stake_history.as_ref().map(|h| h.len())
|
||||||
);
|
);
|
||||||
|
|
||||||
let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch;
|
let ten_epoch_slot_long = 10 * next_epoch.slots_in_epoch;
|
||||||
|
|
||||||
//log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
|
//log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
|
||||||
for storestake in stake_map.values() {
|
for storestake in stake_map.values() {
|
||||||
|
@ -392,47 +413,149 @@ fn calculate_leader_schedule_from_stake_map(
|
||||||
"leader schedule vote:{} with None root_slot, add it",
|
"leader schedule vote:{} with None root_slot, add it",
|
||||||
vote_account.pubkey
|
vote_account.pubkey
|
||||||
);
|
);
|
||||||
current_epoch_info.absolute_slot
|
next_epoch.absolute_slot
|
||||||
}) < current_epoch_info
|
}) < next_epoch.absolute_slot.saturating_sub(ten_epoch_slot_long)
|
||||||
.absolute_slot
|
|
||||||
.saturating_sub(ten_epoch_slot_long)
|
|
||||||
{
|
{
|
||||||
log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
|
log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
|
||||||
, storestake.stake.voter_pubkey
|
, storestake.stake.voter_pubkey
|
||||||
,vote_account.vote_data.node_pubkey
|
,vote_account.vote_data.node_pubkey
|
||||||
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||||
, storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)),
|
, storestake.stake.stake(next_epoch.epoch, stake_history, Some(0)),
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
let effective_stake = storestake
|
let effective_stake = storestake
|
||||||
.stake
|
.stake
|
||||||
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||||
.stake(current_epoch_info.epoch, stake_history, Some(0));
|
.stake(next_epoch.epoch, stake_history, Some(0));
|
||||||
//only vote account with positive stake are use for the schedule.
|
|
||||||
if effective_stake > 0 {
|
stakes
|
||||||
*(stakes
|
.entry(vote_account.pubkey)
|
||||||
.entry(vote_account.vote_data.node_pubkey)
|
.or_insert((0, vote_account.clone()))
|
||||||
.or_insert(0)) += effective_stake;
|
.0 += effective_stake;
|
||||||
} else {
|
|
||||||
log::info!(
|
// //only vote account with positive stake are use for the schedule.
|
||||||
"leader schedule vote:{} with 0 effective vote",
|
// if effective_stake > 0 {
|
||||||
storestake.stake.voter_pubkey
|
// *(stakes
|
||||||
);
|
// .entry(vote_account.vote_data.node_pubkey)
|
||||||
}
|
// .or_insert(0)) += effective_stake;
|
||||||
|
// } else {
|
||||||
|
// log::info!(
|
||||||
|
// "leader schedule vote:{} with 0 effective vote",
|
||||||
|
// storestake.stake.voter_pubkey
|
||||||
|
// );
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
stakes
|
||||||
let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
|
|
||||||
schedule_stakes.extend(stakes.drain());
|
|
||||||
|
|
||||||
let leader_schedule = calculate_leader_schedule(
|
|
||||||
&mut schedule_stakes,
|
|
||||||
current_epoch_info.epoch,
|
|
||||||
current_epoch_info.slots_in_epoch,
|
|
||||||
)?;
|
|
||||||
Ok((leader_schedule, schedule_stakes))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Copied from leader_schedule_utils.rs
|
||||||
|
// Mostly cribbed from leader_schedule_utils
|
||||||
|
fn calculate_leader_schedule(
|
||||||
|
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
|
||||||
|
epoch: u64,
|
||||||
|
slots_in_epoch: u64,
|
||||||
|
) -> HashMap<String, Vec<usize>> {
|
||||||
|
let mut stakes: Vec<(Pubkey, u64)> = stake_vote_map
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(pk, (stake, _))| (*stake > 0).then_some((*pk, *stake)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut seed = [0u8; 32];
|
||||||
|
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
|
||||||
|
sort_stakes(&mut stakes);
|
||||||
|
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
|
||||||
|
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
|
||||||
|
|
||||||
|
let slot_schedule = schedule
|
||||||
|
.get_slot_leaders()
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, pk)| (pk.to_string(), i))
|
||||||
|
.into_group_map()
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
slot_schedule
|
||||||
|
}
|
||||||
|
|
||||||
|
// fn calculate_leader_schedule_from_stake_map(
|
||||||
|
// stake_map: &crate::stakestore::StakeMap,
|
||||||
|
// vote_map: &crate::votestore::VoteMap,
|
||||||
|
// current_epoch_info: &EpochInfo,
|
||||||
|
// stake_history: Option<&StakeHistory>,
|
||||||
|
// ) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
|
||||||
|
// let mut stakes = HashMap::<Pubkey, u64>::new();
|
||||||
|
// log::info!(
|
||||||
|
// "calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
|
||||||
|
// vote_map.len(),
|
||||||
|
// stake_map.len(),
|
||||||
|
// stake_history.as_ref().map(|h| h.len())
|
||||||
|
// );
|
||||||
|
|
||||||
|
// let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch;
|
||||||
|
|
||||||
|
// //log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
|
||||||
|
// for storestake in stake_map.values() {
|
||||||
|
// //get nodeid for vote account
|
||||||
|
// let Some(vote_account) = vote_map.get(&storestake.stake.voter_pubkey) else {
|
||||||
|
// log::info!(
|
||||||
|
// "Vote account not found in vote map for stake vote account:{}",
|
||||||
|
// &storestake.stake.voter_pubkey
|
||||||
|
// );
|
||||||
|
// continue;
|
||||||
|
// };
|
||||||
|
// //TODO validate the number of epoch.
|
||||||
|
// //remove vote account that hasn't vote since 10 epoch.
|
||||||
|
// //on testnet the vote account CY7gjryUPV6Pwbsn4aArkMBL7HSaRHB8sPZUvhw558Tm node_id:6YpwLjgXcMWAj29govWQr87kaAGKS7CnoqWsEDJE4h8P
|
||||||
|
// //hasn't vote since a long time but still return on RPC call get_voteaccounts.
|
||||||
|
// //the validator don't use it for leader schedule.
|
||||||
|
// if vote_account.vote_data.root_slot.unwrap_or_else(|| {
|
||||||
|
// //vote with no root_slot are added. They have just been activated and can have active stake- TODO TO TEST.
|
||||||
|
// log::info!(
|
||||||
|
// "leader schedule vote:{} with None root_slot, add it",
|
||||||
|
// vote_account.pubkey
|
||||||
|
// );
|
||||||
|
// current_epoch_info.absolute_slot
|
||||||
|
// }) < current_epoch_info
|
||||||
|
// .absolute_slot
|
||||||
|
// .saturating_sub(ten_epoch_slot_long)
|
||||||
|
// {
|
||||||
|
// log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
|
||||||
|
// , storestake.stake.voter_pubkey
|
||||||
|
// ,vote_account.vote_data.node_pubkey
|
||||||
|
// //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||||
|
// , storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)),
|
||||||
|
// );
|
||||||
|
// } else {
|
||||||
|
// let effective_stake = storestake
|
||||||
|
// .stake
|
||||||
|
// //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
|
||||||
|
// .stake(current_epoch_info.epoch, stake_history, Some(0));
|
||||||
|
// //only vote account with positive stake are use for the schedule.
|
||||||
|
// if effective_stake > 0 {
|
||||||
|
// *(stakes
|
||||||
|
// .entry(vote_account.vote_data.node_pubkey)
|
||||||
|
// .or_insert(0)) += effective_stake;
|
||||||
|
// } else {
|
||||||
|
// log::info!(
|
||||||
|
// "leader schedule vote:{} with 0 effective vote",
|
||||||
|
// storestake.stake.voter_pubkey
|
||||||
|
// );
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
|
||||||
|
// schedule_stakes.extend(stakes.drain());
|
||||||
|
|
||||||
|
// let leader_schedule = calculate_leader_schedule_old(
|
||||||
|
// &mut schedule_stakes,
|
||||||
|
// current_epoch_info.epoch,
|
||||||
|
// current_epoch_info.slots_in_epoch,
|
||||||
|
// )?;
|
||||||
|
// Ok((leader_schedule, schedule_stakes))
|
||||||
|
// }
|
||||||
|
|
||||||
// fn is_stake_to_add(
|
// fn is_stake_to_add(
|
||||||
// stake_pubkey: Pubkey,
|
// stake_pubkey: Pubkey,
|
||||||
// stake: &Delegation,
|
// stake: &Delegation,
|
||||||
|
@ -461,30 +584,30 @@ fn calculate_leader_schedule_from_stake_map(
|
||||||
|
|
||||||
//Copied from leader_schedule_utils.rs
|
//Copied from leader_schedule_utils.rs
|
||||||
// Mostly cribbed from leader_schedule_utils
|
// Mostly cribbed from leader_schedule_utils
|
||||||
fn calculate_leader_schedule(
|
// fn calculate_leader_schedule_old(
|
||||||
stakes: &mut Vec<(Pubkey, u64)>,
|
// stakes: &mut Vec<(Pubkey, u64)>,
|
||||||
epoch: u64,
|
// epoch: u64,
|
||||||
slots_in_epoch: u64,
|
// slots_in_epoch: u64,
|
||||||
) -> anyhow::Result<HashMap<String, Vec<usize>>> {
|
// ) -> anyhow::Result<HashMap<String, Vec<usize>>> {
|
||||||
if stakes.is_empty() {
|
// if stakes.is_empty() {
|
||||||
bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
|
// bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
|
||||||
}
|
// }
|
||||||
let mut seed = [0u8; 32];
|
// let mut seed = [0u8; 32];
|
||||||
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
|
// seed[0..8].copy_from_slice(&epoch.to_le_bytes());
|
||||||
sort_stakes(stakes);
|
// sort_stakes(stakes);
|
||||||
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}");
|
// log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}");
|
||||||
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
|
// let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
|
||||||
|
|
||||||
let slot_schedule = schedule
|
// let slot_schedule = schedule
|
||||||
.get_slot_leaders()
|
// .get_slot_leaders()
|
||||||
.iter()
|
// .iter()
|
||||||
.enumerate()
|
// .enumerate()
|
||||||
.map(|(i, pk)| (pk.to_string(), i))
|
// .map(|(i, pk)| (pk.to_string(), i))
|
||||||
.into_group_map()
|
// .into_group_map()
|
||||||
.into_iter()
|
// .into_iter()
|
||||||
.collect();
|
// .collect();
|
||||||
Ok(slot_schedule)
|
// Ok(slot_schedule)
|
||||||
}
|
// }
|
||||||
|
|
||||||
// Cribbed from leader_schedule_utils
|
// Cribbed from leader_schedule_utils
|
||||||
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
|
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
|
||||||
|
@ -603,7 +726,7 @@ use solana_sdk::stake::state::StakeState;
|
||||||
pub fn build_current_stakes(
|
pub fn build_current_stakes(
|
||||||
stake_map: &crate::stakestore::StakeMap,
|
stake_map: &crate::stakestore::StakeMap,
|
||||||
stake_history: Option<&StakeHistory>,
|
stake_history: Option<&StakeHistory>,
|
||||||
current_epoch_info: &EpochInfo,
|
current_epoch: u64,
|
||||||
rpc_url: String,
|
rpc_url: String,
|
||||||
commitment: CommitmentConfig,
|
commitment: CommitmentConfig,
|
||||||
) -> BTreeMap<String, (u64, u64)> {
|
) -> BTreeMap<String, (u64, u64)> {
|
||||||
|
@ -628,10 +751,9 @@ pub fn build_current_stakes(
|
||||||
match BorshDeserialize::deserialize(&mut account.data.as_slice()).unwrap() {
|
match BorshDeserialize::deserialize(&mut account.data.as_slice()).unwrap() {
|
||||||
StakeState::Stake(_, stake) => {
|
StakeState::Stake(_, stake) => {
|
||||||
//vote account version
|
//vote account version
|
||||||
let effective_stake =
|
let effective_stake = stake
|
||||||
stake
|
.delegation
|
||||||
.delegation
|
.stake(current_epoch, stake_history, Some(0));
|
||||||
.stake(current_epoch_info.epoch, stake_history, Some(0));
|
|
||||||
if effective_stake > 0 {
|
if effective_stake > 0 {
|
||||||
// Add the stake in this stake account to the total for the delegated-to vote account
|
// Add the stake in this stake account to the total for the delegated-to vote account
|
||||||
log::trace!("RPC Stake {pubkey} account:{account:?} stake:{stake:?}");
|
log::trace!("RPC Stake {pubkey} account:{account:?} stake:{stake:?}");
|
||||||
|
@ -657,10 +779,7 @@ pub fn build_current_stakes(
|
||||||
stake_map
|
stake_map
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(pubkey, stake)| {
|
.filter_map(|(pubkey, stake)| {
|
||||||
let effective_stake =
|
let effective_stake = stake.stake.stake(current_epoch, stake_history, Some(0));
|
||||||
stake
|
|
||||||
.stake
|
|
||||||
.stake(current_epoch_info.epoch, stake_history, Some(0));
|
|
||||||
(effective_stake > 0).then(|| (pubkey, stake, effective_stake))
|
(effective_stake > 0).then(|| (pubkey, stake, effective_stake))
|
||||||
})
|
})
|
||||||
.for_each(|(pubkey, stake, effective_stake)| {
|
.for_each(|(pubkey, stake, effective_stake)| {
|
||||||
|
|
|
@ -48,7 +48,6 @@ use solana_sdk::stake::state::Delegation;
|
||||||
use solana_sdk::vote::state::VoteState;
|
use solana_sdk::vote::state::VoteState;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
use yellowstone_grpc_client::GeyserGrpcClient;
|
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||||
use yellowstone_grpc_proto::geyser::CommitmentLevel;
|
use yellowstone_grpc_proto::geyser::CommitmentLevel;
|
||||||
|
@ -169,7 +168,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
let mut leader_schedule_data = match crate::leader_schedule::bootstrap_leader_schedule(
|
let mut leader_schedule_data = match crate::leader_schedule::bootstrap_leader_schedule(
|
||||||
&schedule_current_file,
|
&schedule_current_file,
|
||||||
&schedule_next_file,
|
&schedule_next_file,
|
||||||
current_epoch_state.current_epoch.slots_in_epoch,
|
¤t_epoch_state,
|
||||||
) {
|
) {
|
||||||
Ok(data) => data,
|
Ok(data) => data,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -246,8 +245,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
//Init bootstrap process
|
//Init bootstrap process
|
||||||
let mut bootstrap_data = BootstrapData {
|
let mut bootstrap_data = BootstrapData {
|
||||||
done: false,
|
done: false,
|
||||||
current_epoch: current_epoch_state.current_epoch.epoch,
|
|
||||||
next_epoch_start_slot: current_epoch_state.next_epoch_start_slot,
|
|
||||||
sleep_time: 1,
|
sleep_time: 1,
|
||||||
rpc_url: RPC_URL.to_string(),
|
rpc_url: RPC_URL.to_string(),
|
||||||
};
|
};
|
||||||
|
@ -261,7 +258,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
|
|
||||||
//TODO remove. Store parent hash to see if we don't miss a block.
|
//TODO remove. Store parent hash to see if we don't miss a block.
|
||||||
let mut parent_block_slot = None;
|
let mut parent_block_slot = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(req) = request_rx.recv() => {
|
Some(req) = request_rx.recv() => {
|
||||||
|
@ -270,13 +266,13 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
tokio::task::spawn_blocking({
|
tokio::task::spawn_blocking({
|
||||||
log::info!("RPC start save_stakes");
|
log::info!("RPC start save_stakes");
|
||||||
let current_stakes = stakestore.get_cloned_stake_map();
|
let current_stakes = stakestore.get_cloned_stake_map();
|
||||||
let history = stakestore.get_cloned_stake_history();
|
let move_epoch = current_epoch_state.get_current_epoch();
|
||||||
let move_epoch = current_epoch_state.current_epoch.clone();
|
let stake_history = stakestore.get_stake_history();
|
||||||
move || {
|
move || {
|
||||||
let current_stake = crate::leader_schedule::build_current_stakes(
|
let current_stake = crate::leader_schedule::build_current_stakes(
|
||||||
¤t_stakes,
|
¤t_stakes,
|
||||||
history.as_ref(),
|
stake_history.as_ref(),
|
||||||
&move_epoch,
|
move_epoch.epoch,
|
||||||
RPC_URL.to_string(),
|
RPC_URL.to_string(),
|
||||||
CommitmentConfig::confirmed(),
|
CommitmentConfig::confirmed(),
|
||||||
);
|
);
|
||||||
|
@ -332,45 +328,15 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
}
|
}
|
||||||
//Manage RPC call result execution
|
//Manage RPC call result execution
|
||||||
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
|
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
|
||||||
if let Some((new_schedule, epoch)) = crate::leader_schedule::run_leader_schedule_events(
|
let new_leader_schedule = crate::leader_schedule::run_leader_schedule_events(
|
||||||
RPC_URL.to_string(),
|
RPC_URL.to_string(),
|
||||||
event,
|
event,
|
||||||
&mut spawned_leader_schedule_task,
|
&mut spawned_leader_schedule_task,
|
||||||
&mut stakestore,
|
&mut stakestore,
|
||||||
&mut votestore,
|
&mut votestore,
|
||||||
) {
|
);
|
||||||
|
leader_schedule_data.current = leader_schedule_data.next.take();
|
||||||
leader_schedule_data.current = leader_schedule_data.next.take();
|
leader_schedule_data.next = new_leader_schedule;
|
||||||
match new_schedule {
|
|
||||||
Ok((new_schedule, new_stakes)) => {
|
|
||||||
leader_schedule_data.next = Some(LeaderScheduleData {
|
|
||||||
schedule: Arc::new(new_schedule),
|
|
||||||
vote_stakes: new_stakes,
|
|
||||||
epoch: epoch.epoch,
|
|
||||||
});
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
log::error!("Error during new leader schedule generation:{err}");
|
|
||||||
log::error!("No leader schedule available for next epoch.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO remove verification when schedule ok.
|
|
||||||
//verify calculated shedule with the one the RPC return.
|
|
||||||
// if let Some(schedule) = new_schedule {
|
|
||||||
// tokio::task::spawn_blocking(|| {
|
|
||||||
// //10 second that the schedule has been calculated on the validator
|
|
||||||
// std::thread::sleep(std::time::Duration::from_secs(5));
|
|
||||||
// log::info!("Start Verify schedule");
|
|
||||||
// if let Err(err) = crate::leader_schedule::verify_schedule(schedule,RPC_URL.to_string()) {
|
|
||||||
// log::warn!("Error during schedule verification:{err}");
|
|
||||||
// }
|
|
||||||
// log::info!("End Verify schedule");
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//get confirmed slot or account
|
//get confirmed slot or account
|
||||||
|
@ -392,7 +358,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
|
||||||
if let Err(err) = stakestore.notify_stake_change(
|
if let Err(err) = stakestore.notify_stake_change(
|
||||||
account,
|
account,
|
||||||
current_epoch_state.current_epoch_end_slot(),
|
current_epoch_state.current_epoch_end_slot(),
|
||||||
current_epoch_state.current_epoch.epoch,
|
|
||||||
) {
|
) {
|
||||||
log::warn!("Can't add new stake from account data err:{}", err);
|
log::warn!("Can't add new stake from account data err:{}", err);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -22,7 +22,7 @@ use thiserror::Error;
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3000";
|
const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3001";
|
||||||
const SERVER_ERROR_MSG: &str = "Internal server error";
|
const SERVER_ERROR_MSG: &str = "Internal server error";
|
||||||
|
|
||||||
//internal RPC access
|
//internal RPC access
|
||||||
|
@ -88,7 +88,10 @@ pub struct RPCServer {
|
||||||
|
|
||||||
#[jsonrpsee::core::async_trait]
|
#[jsonrpsee::core::async_trait]
|
||||||
impl ConsensusRpcServer for RPCServer {
|
impl ConsensusRpcServer for RPCServer {
|
||||||
async fn get_latest_blockhash(&self, config: Option<RpcContextConfig>) -> Result<RpcBlockhash> {
|
async fn get_latest_blockhash(
|
||||||
|
&self,
|
||||||
|
_config: Option<RpcContextConfig>,
|
||||||
|
) -> Result<RpcBlockhash> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +140,11 @@ pub fn server_rpc_request(
|
||||||
) {
|
) {
|
||||||
match request {
|
match request {
|
||||||
crate::rpc::Requests::EpochInfo(tx) => {
|
crate::rpc::Requests::EpochInfo(tx) => {
|
||||||
if let Err(err) = tx.send(current_epoch_state.current_epoch.clone()) {
|
if let Err(err) = tx.send(
|
||||||
|
current_epoch_state
|
||||||
|
.get_current_epoch()
|
||||||
|
.into_epoch_info(0, None),
|
||||||
|
) {
|
||||||
log::warn!("Channel error during sending back request status error:{err:?}");
|
log::warn!("Channel error during sending back request status error:{err:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,11 +165,11 @@ pub fn server_rpc_request(
|
||||||
|
|
||||||
log::info!(
|
log::info!(
|
||||||
"Requests::LeaderSchedule slot:{slot} epoch:{epoch} current epoch:{}",
|
"Requests::LeaderSchedule slot:{slot} epoch:{epoch} current epoch:{}",
|
||||||
current_epoch_state.current_epoch.epoch
|
current_epoch_state.get_current_epoch().epoch
|
||||||
);
|
);
|
||||||
//currently only return schedule for current of next epoch.
|
//currently only return schedule for current of next epoch.
|
||||||
let get_schedule_fn = |schedule: &LeaderScheduleData| {
|
let get_schedule_fn = |schedule: &LeaderScheduleData| {
|
||||||
(schedule.epoch == epoch).then(|| schedule.schedule.clone()) //Arc clone.
|
(schedule.epoch == epoch).then(|| schedule.schedule.clone()) //clone the schedule. TODO clone arc.
|
||||||
};
|
};
|
||||||
let schedule = leader_schedules
|
let schedule = leader_schedules
|
||||||
.current
|
.current
|
||||||
|
@ -285,7 +292,7 @@ pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHan
|
||||||
Err("Error stake store extracted".to_string())
|
Err("Error stake store extracted".to_string())
|
||||||
} else {
|
} else {
|
||||||
//replace pubkey with String. Json only allow distionary key with string.
|
//replace pubkey with String. Json only allow distionary key with string.
|
||||||
let ret: HashMap<String, StoredVote> = accounts
|
let ret: HashMap<String, Arc<StoredVote>> = accounts
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(pk, acc)| (pk.to_string(), acc))
|
.map(|(pk, acc)| (pk.to_string(), acc))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
|
@ -12,13 +12,14 @@ use solana_sdk::stake::state::Delegation;
|
||||||
use solana_sdk::stake::state::StakeState;
|
use solana_sdk::stake::state::StakeState;
|
||||||
use solana_sdk::stake_history::StakeHistory;
|
use solana_sdk::stake_history::StakeHistory;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::str::FromStr;
|
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction;
|
use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction;
|
||||||
|
|
||||||
pub type StakeMap = HashMap<Pubkey, StoredStake>;
|
pub type StakeMap = HashMap<Pubkey, StoredStake>;
|
||||||
|
|
||||||
pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMap> {
|
pub fn extract_stakestore(
|
||||||
|
stakestore: &mut StakeStore,
|
||||||
|
) -> anyhow::Result<(StakeMap, Option<StakeHistory>)> {
|
||||||
let new_store = std::mem::take(stakestore);
|
let new_store = std::mem::take(stakestore);
|
||||||
let (new_store, stake_map) = new_store.extract()?;
|
let (new_store, stake_map) = new_store.extract()?;
|
||||||
*stakestore = new_store;
|
*stakestore = new_store;
|
||||||
|
@ -28,21 +29,15 @@ pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMa
|
||||||
pub fn merge_stakestore(
|
pub fn merge_stakestore(
|
||||||
stakestore: &mut StakeStore,
|
stakestore: &mut StakeStore,
|
||||||
stake_map: StakeMap,
|
stake_map: StakeMap,
|
||||||
current_epoch: u64,
|
stake_history: Option<StakeHistory>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let new_store = std::mem::take(stakestore);
|
let new_store = std::mem::take(stakestore);
|
||||||
let new_store = new_store.merge_stakes(stake_map, current_epoch)?;
|
let new_store = new_store.merge_stakes(stake_map, stake_history)?;
|
||||||
*stakestore = new_store;
|
*stakestore = new_store;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch: u64) {
|
fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake) {
|
||||||
//don't add stake that are already desactivated.
|
|
||||||
//there's some stran,ge stake that has activeate_epock = max epoch and desactivate ecpock 90 that are taken into account.
|
|
||||||
//Must be better defined.
|
|
||||||
// if stake.stake.deactivation_epoch < current_epoch {
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
//log::info!("stake_map_notify_stake stake:{stake:?}");
|
//log::info!("stake_map_notify_stake stake:{stake:?}");
|
||||||
match map.entry(stake.pubkey) {
|
match map.entry(stake.pubkey) {
|
||||||
// If value already exists, then increment it by one
|
// If value already exists, then increment it by one
|
||||||
|
@ -70,7 +65,6 @@ fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch:
|
||||||
pub enum ExtractedAction {
|
pub enum ExtractedAction {
|
||||||
Notify {
|
Notify {
|
||||||
stake: StoredStake,
|
stake: StoredStake,
|
||||||
current_epoch: u64,
|
|
||||||
},
|
},
|
||||||
Remove(Pubkey, Slot),
|
Remove(Pubkey, Slot),
|
||||||
Merge {
|
Merge {
|
||||||
|
@ -85,18 +79,12 @@ pub enum ExtractedAction {
|
||||||
impl ExtractedAction {
|
impl ExtractedAction {
|
||||||
fn get_update_slot(&self) -> u64 {
|
fn get_update_slot(&self) -> u64 {
|
||||||
match self {
|
match self {
|
||||||
ExtractedAction::Notify { stake, .. } => stake.last_update_slot,
|
ExtractedAction::Notify { stake } => stake.last_update_slot,
|
||||||
ExtractedAction::Remove(_, slot) => *slot,
|
ExtractedAction::Remove(_, slot) => *slot,
|
||||||
ExtractedAction::Merge { update_slot, .. } => *update_slot,
|
ExtractedAction::Merge { update_slot, .. } => *update_slot,
|
||||||
ExtractedAction::None => 0,
|
ExtractedAction::None => 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_epoch(&mut self, epoch: u64) {
|
|
||||||
if let ExtractedAction::Notify { current_epoch, .. } = self {
|
|
||||||
*current_epoch = epoch;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||||
|
@ -108,16 +96,6 @@ pub struct StoredStake {
|
||||||
pub write_version: u64,
|
pub write_version: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
// impl StoredStake {
|
|
||||||
// fn is_removed(&self, current_epoch: u64) -> bool {
|
|
||||||
// self.stake.activation_epoch != crate::leader_schedule::MAX_EPOCH_VALUE
|
|
||||||
// && self.stake.deactivation_epoch < current_epoch
|
|
||||||
// }
|
|
||||||
// fn is_inserted(&self, current_epoch: u64) -> bool {
|
|
||||||
// self.stake.activation_epoch == crate::leader_schedule::MAX_EPOCH_VALUE
|
|
||||||
// || self.stake.deactivation_epoch >= current_epoch
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct StakeStore {
|
pub struct StakeStore {
|
||||||
stakes: StakeMap,
|
stakes: StakeMap,
|
||||||
|
@ -136,8 +114,12 @@ impl StakeStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_stake_history(&mut self, stake_history: StakeHistory) {
|
// pub fn set_stake_history(&mut self, stake_history: StakeHistory) {
|
||||||
self.stake_history = Some(stake_history);
|
// self.stake_history = Some(Arc::new(stake_history));
|
||||||
|
// }
|
||||||
|
|
||||||
|
pub fn get_stake_history(&self) -> Option<StakeHistory> {
|
||||||
|
self.stake_history.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn nb_stake_account(&self) -> usize {
|
pub fn nb_stake_account(&self) -> usize {
|
||||||
|
@ -147,48 +129,34 @@ impl StakeStore {
|
||||||
pub fn get_cloned_stake_map(&self) -> StakeMap {
|
pub fn get_cloned_stake_map(&self) -> StakeMap {
|
||||||
self.stakes.clone()
|
self.stakes.clone()
|
||||||
}
|
}
|
||||||
pub fn get_cloned_stake_history(&self) -> Option<StakeHistory> {
|
|
||||||
self.stake_history.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn notify_stake_change(
|
pub fn notify_stake_change(
|
||||||
&mut self,
|
&mut self,
|
||||||
new_account: AccountPretty,
|
account: AccountPretty,
|
||||||
current_end_epoch_slot: Slot,
|
current_end_epoch_slot: Slot,
|
||||||
current_epoch: u64,
|
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let Ok(delegated_stake_opt) = new_account.read_stake() else {
|
//if lamport == 0 the account has been removed.
|
||||||
bail!("Can't read stake from account data");
|
if account.lamports == 0 {
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(delegated_stake) = delegated_stake_opt {
|
|
||||||
let stake = StoredStake {
|
|
||||||
pubkey: new_account.pubkey,
|
|
||||||
lamports: new_account.lamports,
|
|
||||||
stake: delegated_stake,
|
|
||||||
last_update_slot: new_account.slot,
|
|
||||||
write_version: new_account.write_version,
|
|
||||||
};
|
|
||||||
|
|
||||||
self.notify_stake_action(
|
self.notify_stake_action(
|
||||||
ExtractedAction::Notify {
|
ExtractedAction::Remove(account.pubkey, account.slot),
|
||||||
stake,
|
|
||||||
current_epoch,
|
|
||||||
},
|
|
||||||
current_end_epoch_slot,
|
current_end_epoch_slot,
|
||||||
);
|
);
|
||||||
|
} else {
|
||||||
|
let Ok(delegated_stake_opt) = account.read_stake() else {
|
||||||
|
bail!("Can't read stake from account data");
|
||||||
|
};
|
||||||
|
|
||||||
//during extract push the new update or
|
if let Some(delegated_stake) = delegated_stake_opt {
|
||||||
//don't insertnow account change that has been done in next epoch.
|
let stake = StoredStake {
|
||||||
//put in update pool to be merged next epoch change.
|
pubkey: account.pubkey,
|
||||||
// let insert_stake = !self.extracted || ststake.last_update_slot > current_end_epoch_slot;
|
lamports: account.lamports,
|
||||||
// match insert_stake {
|
stake: delegated_stake,
|
||||||
// false => self.updates.push(ExtractedAction::Notify {
|
last_update_slot: account.slot,
|
||||||
// stake_account: new_account.pubkey,
|
write_version: account.write_version,
|
||||||
// stake: ststake,
|
};
|
||||||
// }),
|
|
||||||
// true => self.notify_stake(new_account.pubkey, ststake, current_epoch),
|
self.notify_stake_action(ExtractedAction::Notify { stake }, current_end_epoch_slot);
|
||||||
// }
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -206,11 +174,8 @@ impl StakeStore {
|
||||||
|
|
||||||
fn process_stake_action(&mut self, action: ExtractedAction) {
|
fn process_stake_action(&mut self, action: ExtractedAction) {
|
||||||
match action {
|
match action {
|
||||||
ExtractedAction::Notify {
|
ExtractedAction::Notify { stake } => {
|
||||||
stake,
|
stake_map_notify_stake(&mut self.stakes, stake);
|
||||||
current_epoch,
|
|
||||||
} => {
|
|
||||||
stake_map_notify_stake(&mut self.stakes, stake, current_epoch);
|
|
||||||
}
|
}
|
||||||
ExtractedAction::Remove(account_pk, slot) => self.remove_from_store(&account_pk, slot),
|
ExtractedAction::Remove(account_pk, slot) => self.remove_from_store(&account_pk, slot),
|
||||||
//not use currently. TODO remove.
|
//not use currently. TODO remove.
|
||||||
|
@ -229,27 +194,31 @@ impl StakeStore {
|
||||||
//return the contained stake map to do an external update.
|
//return the contained stake map to do an external update.
|
||||||
// During extract period (between extract and merge) added stake a stored to be processed later.
|
// During extract period (between extract and merge) added stake a stored to be processed later.
|
||||||
//if the store is already extracted return an error.
|
//if the store is already extracted return an error.
|
||||||
fn extract(self) -> anyhow::Result<(Self, StakeMap)> {
|
fn extract(self) -> anyhow::Result<(Self, (StakeMap, Option<StakeHistory>))> {
|
||||||
if self.extracted {
|
if self.extracted {
|
||||||
bail!("StakeStore already extracted. Try later");
|
bail!("StakeStore already extracted. Try later");
|
||||||
}
|
}
|
||||||
let stakestore = StakeStore {
|
let stakestore = StakeStore {
|
||||||
stakes: HashMap::new(),
|
stakes: HashMap::new(),
|
||||||
stake_history: self.stake_history,
|
stake_history: None,
|
||||||
updates: self.updates,
|
updates: self.updates,
|
||||||
extracted: true,
|
extracted: true,
|
||||||
};
|
};
|
||||||
Ok((stakestore, self.stakes))
|
Ok((stakestore, (self.stakes, self.stake_history)))
|
||||||
}
|
}
|
||||||
|
|
||||||
//Merge the stake map an,d replace the current one.
|
//Merge the stake map an,d replace the current one.
|
||||||
fn merge_stakes(mut self, stakes: StakeMap, current_epoch: u64) -> anyhow::Result<Self> {
|
fn merge_stakes(
|
||||||
|
mut self,
|
||||||
|
stakes: StakeMap,
|
||||||
|
stake_history: Option<StakeHistory>,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
if !self.extracted {
|
if !self.extracted {
|
||||||
bail!("StakeStore merge of non extracted map. Try later");
|
bail!("StakeStore merge of non extracted map. Try later");
|
||||||
}
|
}
|
||||||
let mut stakestore = StakeStore {
|
let mut stakestore = StakeStore {
|
||||||
stakes,
|
stakes,
|
||||||
stake_history: self.stake_history,
|
stake_history,
|
||||||
updates: vec![],
|
updates: vec![],
|
||||||
extracted: false,
|
extracted: false,
|
||||||
};
|
};
|
||||||
|
@ -257,25 +226,9 @@ impl StakeStore {
|
||||||
self.updates
|
self.updates
|
||||||
.sort_unstable_by(|a, b| a.get_update_slot().cmp(&b.get_update_slot()));
|
.sort_unstable_by(|a, b| a.get_update_slot().cmp(&b.get_update_slot()));
|
||||||
//apply stake added during extraction.
|
//apply stake added during extraction.
|
||||||
for mut action in self.updates {
|
for action in self.updates {
|
||||||
action.update_epoch(current_epoch);
|
|
||||||
stakestore.process_stake_action(action);
|
stakestore.process_stake_action(action);
|
||||||
}
|
}
|
||||||
|
|
||||||
//verify one stake account to test. TODO remove
|
|
||||||
let stake_account = stakestore
|
|
||||||
.stakes
|
|
||||||
.get(&Pubkey::from_str("2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce").unwrap());
|
|
||||||
let stake = stake_account.map(|stake| {
|
|
||||||
stake
|
|
||||||
.stake
|
|
||||||
.stake(current_epoch, stakestore.stake_history.as_ref(), Some(0))
|
|
||||||
});
|
|
||||||
log::info!(
|
|
||||||
"merge_stakes 2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce:{:?}",
|
|
||||||
stake
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(stakestore)
|
Ok(stakestore)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,7 +239,7 @@ impl StakeStore {
|
||||||
.map(|stake| stake.last_update_slot <= update_slot)
|
.map(|stake| stake.last_update_slot <= update_slot)
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
{
|
{
|
||||||
log::info!("remove_from_store for {}", account_pk.to_string());
|
log::info!("Stake remove_from_store for {}", account_pk.to_string());
|
||||||
self.stakes.remove(account_pk);
|
self.stakes.remove(account_pk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -296,7 +249,6 @@ pub fn merge_program_account_in_strake_map(
|
||||||
stake_map: &mut StakeMap,
|
stake_map: &mut StakeMap,
|
||||||
stakes_list: Vec<(Pubkey, Account)>,
|
stakes_list: Vec<(Pubkey, Account)>,
|
||||||
last_update_slot: Slot,
|
last_update_slot: Slot,
|
||||||
current_epoch: u64,
|
|
||||||
) {
|
) {
|
||||||
stakes_list
|
stakes_list
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -318,7 +270,7 @@ pub fn merge_program_account_in_strake_map(
|
||||||
write_version: 0,
|
write_version: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
stake_map_notify_stake(stake_map, stake, current_epoch);
|
stake_map_notify_stake(stake_map, stake);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -764,21 +716,21 @@ pub async fn process_stake_tx_message(
|
||||||
current_end_epoch_slot,
|
current_end_epoch_slot,
|
||||||
);
|
);
|
||||||
|
|
||||||
send_verification(
|
// send_verification(
|
||||||
stake_sender,
|
// stake_sender,
|
||||||
stakestore,
|
// stakestore,
|
||||||
"Merge Destination",
|
// "Merge Destination",
|
||||||
account_keys[instruction.accounts[0] as usize],
|
// account_keys[instruction.accounts[0] as usize],
|
||||||
)
|
// )
|
||||||
.await;
|
// .await;
|
||||||
|
|
||||||
send_verification(
|
// send_verification(
|
||||||
stake_sender,
|
// stake_sender,
|
||||||
stakestore,
|
// stakestore,
|
||||||
"Merge Source",
|
// "Merge Source",
|
||||||
account_keys[instruction.accounts[1] as usize],
|
// account_keys[instruction.accounts[1] as usize],
|
||||||
)
|
// )
|
||||||
.await;
|
// .await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,8 +6,9 @@ use solana_sdk::account::Account;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::vote::state::VoteState;
|
use solana_sdk::vote::state::VoteState;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub type VoteMap = HashMap<Pubkey, StoredVote>;
|
pub type VoteMap = HashMap<Pubkey, Arc<StoredVote>>;
|
||||||
|
|
||||||
//Copy the solana_rpc_client_api::response::RpcVoteAccountInfo struct
|
//Copy the solana_rpc_client_api::response::RpcVoteAccountInfo struct
|
||||||
//to avoid to add a dep to solana_rpc_client that create
|
//to avoid to add a dep to solana_rpc_client that create
|
||||||
|
@ -104,32 +105,50 @@ impl VoteStore {
|
||||||
new_account: AccountPretty,
|
new_account: AccountPretty,
|
||||||
current_end_epoch_slot: Slot,
|
current_end_epoch_slot: Slot,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let Ok(vote_data) = new_account.read_vote() else {
|
if new_account.lamports == 0 {
|
||||||
bail!("Can't read Vote from account data");
|
self.remove_from_store(&new_account.pubkey, new_account.slot);
|
||||||
};
|
} else {
|
||||||
|
let Ok(vote_data) = new_account.read_vote() else {
|
||||||
|
bail!("Can't read Vote from account data");
|
||||||
|
};
|
||||||
|
|
||||||
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
|
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
|
||||||
|
|
||||||
let new_voteacc = StoredVote {
|
let new_voteacc = StoredVote {
|
||||||
pubkey: new_account.pubkey,
|
pubkey: new_account.pubkey,
|
||||||
vote_data,
|
vote_data,
|
||||||
last_update_slot: new_account.slot,
|
last_update_slot: new_account.slot,
|
||||||
write_version: new_account.write_version,
|
write_version: new_account.write_version,
|
||||||
};
|
};
|
||||||
|
|
||||||
//during extract push the new update or
|
//during extract push the new update or
|
||||||
//don't insertnow account change that has been done in next epoch.
|
//don't insertnow account change that has been done in next epoch.
|
||||||
//put in update pool to be merged next epoch change.
|
//put in update pool to be merged next epoch change.
|
||||||
let insert_stake = !self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
|
let insert_stake =
|
||||||
match insert_stake {
|
!self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
|
||||||
false => self.updates.push((new_account.pubkey, new_voteacc)),
|
match insert_stake {
|
||||||
true => self.insert_vote(new_account.pubkey, new_voteacc),
|
false => self.updates.push((new_account.pubkey, new_voteacc)),
|
||||||
|
true => self.insert_vote(new_account.pubkey, new_voteacc),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) {
|
fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) {
|
||||||
vote_map_insert_vote(&mut self.votes, vote_account, vote_data);
|
vote_map_insert_vote(&mut self.votes, vote_account, vote_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn remove_from_store(&mut self, account_pk: &Pubkey, update_slot: Slot) {
|
||||||
|
if self
|
||||||
|
.votes
|
||||||
|
.get(account_pk)
|
||||||
|
.map(|vote| vote.last_update_slot <= update_slot)
|
||||||
|
.unwrap_or(true)
|
||||||
|
{
|
||||||
|
log::info!("Vote remove_from_store for {}", account_pk.to_string());
|
||||||
|
self.votes.remove(account_pk);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn merge_program_account_in_vote_map(
|
pub fn merge_program_account_in_vote_map(
|
||||||
|
@ -180,7 +199,7 @@ fn vote_map_insert_vote(map: &mut VoteMap, vote_account_pk: Pubkey, vote_data: S
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
*voteacc = vote_data;
|
*voteacc = Arc::new(vote_data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If value doesn't exist yet, then insert a new value of 1
|
// If value doesn't exist yet, then insert a new value of 1
|
||||||
|
@ -190,7 +209,7 @@ fn vote_map_insert_vote(map: &mut VoteMap, vote_account_pk: Pubkey, vote_data: S
|
||||||
vote_data.vote_data.node_pubkey,
|
vote_data.vote_data.node_pubkey,
|
||||||
vote_data.vote_data.root_slot,
|
vote_data.vote_data.root_slot,
|
||||||
);
|
);
|
||||||
vacant.insert(vote_data);
|
vacant.insert(Arc::new(vote_data));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue