Compare commits

...

3 Commits

Author SHA1 Message Date
musitdev 1c6aa5fe8b change open port 2023-10-06 17:32:56 +02:00
musitdev f91965e1dd manage account removal from notification 2023-10-06 17:20:17 +02:00
musitdev db8a3079cb add stake history calculus and refactor to optimize 2023-10-06 16:25:21 +02:00
7 changed files with 610 additions and 544 deletions

View File

@ -1,12 +1,12 @@
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore}; use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore}; use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
use crate::Slot;
use futures_util::stream::FuturesUnordered; use futures_util::stream::FuturesUnordered;
use solana_client::client_error::ClientError; use solana_client::client_error::ClientError;
use solana_client::rpc_client::RpcClient; use solana_client::rpc_client::RpcClient;
use solana_sdk::account::Account; use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake_history::StakeHistory;
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
/* /*
@ -64,7 +64,7 @@ pub enum BootstrapEvent {
Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>,
Account, Account,
), ),
AccountsMerged(StakeMap, VoteMap), AccountsMerged(StakeMap, Option<StakeHistory>, VoteMap),
Exit, Exit,
} }
@ -76,8 +76,6 @@ enum BootsrapProcessResult {
pub struct BootstrapData { pub struct BootstrapData {
pub done: bool, pub done: bool,
pub current_epoch: u64,
pub next_epoch_start_slot: Slot,
pub sleep_time: u64, pub sleep_time: u64,
pub rpc_url: String, pub rpc_url: String,
} }
@ -116,7 +114,7 @@ fn process_bootstrap_event(
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history) => { BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history) => {
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV"); log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
match (extract_stakestore(stakestore), extract_votestore(votestore)) { match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => BootsrapProcessResult::Event( (Ok((stake_map, _)), Ok(vote_map)) => BootsrapProcessResult::Event(
BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes, history), BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes, history),
), ),
_ => { _ => {
@ -131,27 +129,20 @@ fn process_bootstrap_event(
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes, history) => { BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes, history) => {
log::info!("BootstrapEvent::StoreExtracted RECV"); log::info!("BootstrapEvent::StoreExtracted RECV");
match crate::stakestore::read_historystake_from_account(history) { let stake_history = crate::stakestore::read_historystake_from_account(history);
Some(stake_history) => { if stake_history.is_none() {
log::info!( //TODO return error.
"Read stake history done with history len:{}", log::error!("Bootstrap error, can't read stake history.");
stake_history.len()
);
stakestore.set_stake_history(stake_history);
}
None => log::error!("Bootstrap error, can't read stake history."),
} }
//merge new PA with stake map and vote map in a specific task //merge new PA with stake map and vote map in a specific task
let jh = tokio::task::spawn_blocking({ let jh = tokio::task::spawn_blocking({
let current_epoch = data.current_epoch;
move || { move || {
//update pa_list to set slot update to start epoq one. //update pa_list to set slot update to start epoq one.
crate::stakestore::merge_program_account_in_strake_map( crate::stakestore::merge_program_account_in_strake_map(
&mut stake_map, &mut stake_map,
stakes, stakes,
0, //with RPC no way to know the slot of the account update. Set to 0. 0, //with RPC no way to know the slot of the account update. Set to 0.
current_epoch,
); );
crate::votestore::merge_program_account_in_vote_map( crate::votestore::merge_program_account_in_vote_map(
&mut vote_map, &mut vote_map,
@ -159,15 +150,15 @@ fn process_bootstrap_event(
0, //with RPC no way to know the slot of the account update. Set to 0. 0, //with RPC no way to know the slot of the account update. Set to 0.
); );
BootstrapEvent::AccountsMerged(stake_map, vote_map) BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map)
} }
}); });
BootsrapProcessResult::TaskHandle(jh) BootsrapProcessResult::TaskHandle(jh)
} }
BootstrapEvent::AccountsMerged(stake_map, vote_map) => { BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map) => {
log::info!("BootstrapEvent::AccountsMerged RECV"); log::info!("BootstrapEvent::AccountsMerged RECV");
match ( match (
merge_stakestore(stakestore, stake_map, data.current_epoch), merge_stakestore(stakestore, stake_map, stake_history),
merge_votestore(votestore, vote_map), merge_votestore(votestore, vote_map),
) { ) {
(Ok(()), Ok(())) => BootsrapProcessResult::End, (Ok(()), Ok(())) => BootsrapProcessResult::End,

View File

@ -1,132 +1,150 @@
use crate::leader_schedule::LeaderScheduleEvent; use crate::leader_schedule::LeaderScheduleEvent;
use crate::Slot; use crate::Slot;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use solana_account_decoder::parse_sysvar::SysvarAccountType; use solana_account_decoder::parse_sysvar::SysvarAccountType;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::epoch_info::EpochInfo; use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::sync::Arc;
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel; use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot; use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
pub struct Epoch {
pub epoch: u64,
pub slot_index: u64,
pub slots_in_epoch: u64,
pub absolute_slot: Slot,
}
impl Epoch {
pub fn get_next_epoch(&self, current_epoch: &CurrentEpochSlotState) -> Epoch {
let last_slot = current_epoch.epoch_cache.get_last_slot_in_epoch(self.epoch);
current_epoch.epoch_cache.get_epoch_at_slot(last_slot + 1)
}
pub fn into_epoch_info(&self, block_height: u64, transaction_count: Option<u64>) -> EpochInfo {
EpochInfo {
epoch: self.epoch,
slot_index: self.slot_index,
slots_in_epoch: self.slots_in_epoch,
absolute_slot: self.absolute_slot,
block_height,
transaction_count,
}
}
}
pub fn get_epoch_for_slot(slot: Slot, current_epoch: &CurrentEpochSlotState) -> u64 { pub fn get_epoch_for_slot(slot: Slot, current_epoch: &CurrentEpochSlotState) -> u64 {
let slots_in_epoch = current_epoch.current_epoch.slots_in_epoch; current_epoch.epoch_cache.get_epoch_at_slot(slot).epoch
let epoch_start_slot = current_epoch.current_epoch_start_slot(); }
if slot >= epoch_start_slot {
let slot_distance = (slot - epoch_start_slot) / slots_in_epoch; #[derive(Clone, Debug)]
current_epoch.current_epoch.epoch + slot_distance pub struct EpochCache {
} else { epoch_schedule: Arc<EpochSchedule>,
let slot_distance = (epoch_start_slot - slot) / slots_in_epoch; }
current_epoch.current_epoch.epoch - slot_distance
impl EpochCache {
pub fn get_epoch_at_slot(&self, slot: Slot) -> Epoch {
let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot);
let slots_in_epoch = self.epoch_schedule.get_slots_in_epoch(epoch);
Epoch {
epoch,
slot_index,
slots_in_epoch,
absolute_slot: slot,
}
}
// pub fn get_epoch_shedule(&self) -> EpochSchedule {
// self.epoch_schedule.as_ref().clone()
// }
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
self.epoch_schedule.get_slots_in_epoch(epoch)
}
// pub fn get_first_slot_in_epoch(&self, epoch: u64) -> u64 {
// self.epoch_schedule.get_first_slot_in_epoch(epoch)
// }
pub fn get_last_slot_in_epoch(&self, epoch: u64) -> u64 {
self.epoch_schedule.get_last_slot_in_epoch(epoch)
}
pub async fn bootstrap_epoch(rpc_client: &RpcClient) -> anyhow::Result<EpochCache> {
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;
let Some(SysvarAccountType::EpochSchedule(epoch_schedule)) =
bincode::deserialize(&res_epoch.data[..])
.ok()
.map(SysvarAccountType::EpochSchedule)
else {
bail!("Error during bootstrap epoch. SysvarAccountType::EpochSchedule can't be deserilized. Epoch can't be calculated.");
};
Ok(EpochCache {
epoch_schedule: Arc::new(epoch_schedule),
})
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct CurrentEpochSlotState { pub struct CurrentEpochSlotState {
pub current_slot: CurrentSlot, pub current_slot: CurrentSlot,
pub current_epoch: EpochInfo, epoch_cache: EpochCache,
pub next_epoch_start_slot: Slot, current_epoch_value: Epoch,
pub first_epoch_slot: bool,
} }
impl CurrentEpochSlotState { impl CurrentEpochSlotState {
pub async fn bootstrap(rpc_url: String) -> Result<CurrentEpochSlotState, ClientError> { // pub fn get_epoch_shedule(&self) -> EpochSchedule {
// self.epoch_cache.get_epoch_shedule()
// }
pub fn get_current_epoch(&self) -> Epoch {
self.current_epoch_value
}
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
self.epoch_cache.get_slots_in_epoch(epoch)
}
pub async fn bootstrap(rpc_url: String) -> anyhow::Result<CurrentEpochSlotState> {
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized()); let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized());
//get reduce_stake_warmup_cooldown feature info. let epoch_cache = EpochCache::bootstrap_epoch(&rpc_client).await?;
//NOT AND ACCOUNT. Get from config.
// let reduce_stake_warmup_cooldown_epoch = rpc_client
// .get_account(&feature_set::reduce_stake_warmup_cooldown::id())
// .await?;
// let reduce_stake_warmup_cooldown_epoch = bincode::deserialize(&reduce_stake_warmup_cooldown_epoch.data[..])
// .ok()
// .map(SysvarAccountType::EpochSchedule);
// log::info!("reduce_stake_warmup_cooldown_epoch {reduce_stake_warmup_cooldown_epoch:?}");
//get epoch sysvar account to init epoch. More compatible with a snapshot bootstrap.
//sysvar_epoch_schedule Some(EpochSchedule(EpochSchedule { slots_per_epoch: 100, leader_schedule_slot_offset: 100, warmup: false, first_normal_epoch: 0, first_normal_slot: 0 }))
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;
let sysvar_epoch_schedule = bincode::deserialize(&res_epoch.data[..])
.ok()
.map(SysvarAccountType::EpochSchedule);
log::info!("sysvar_epoch_schedule {sysvar_epoch_schedule:?}");
// Fetch current epoch
let current_epoch = rpc_client.get_epoch_info().await?;
let next_epoch_start_slot =
current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
log::info!("Run_loop init {next_epoch_start_slot} current_epoch:{current_epoch:?}");
Ok(CurrentEpochSlotState { Ok(CurrentEpochSlotState {
current_slot: Default::default(), current_slot: CurrentSlot::default(),
current_epoch, epoch_cache,
next_epoch_start_slot, current_epoch_value: Epoch::default(),
first_epoch_slot: false,
}) })
} }
pub fn current_epoch_start_slot(&self) -> Slot { // pub fn current_epoch_start_slot(&self) -> Slot {
self.current_epoch.absolute_slot - self.current_epoch.slot_index // self.epoch_cache
} // .get_first_slot_in_epoch(self.current_slot.confirmed_slot)
// }
pub fn current_epoch_end_slot(&self) -> Slot { pub fn current_epoch_end_slot(&self) -> Slot {
self.next_epoch_start_slot - 1 self.epoch_cache
.get_last_slot_in_epoch(self.current_epoch_value.epoch)
} }
pub fn process_new_slot( pub fn process_new_slot(
&mut self, &mut self,
new_slot: &SubscribeUpdateSlot, new_slot: &SubscribeUpdateSlot,
) -> Option<LeaderScheduleEvent> { ) -> Option<LeaderScheduleEvent> {
if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
//for the first update of slot correct epoch info data.
if self.current_slot.confirmed_slot == 0 {
let diff = new_slot.slot - self.current_epoch.absolute_slot;
self.current_epoch.slot_index += diff;
self.current_epoch.absolute_slot = new_slot.slot;
log::trace!(
"Set current epoch with diff:{diff} slot:{} current:{:?}",
new_slot.slot,
self.current_epoch,
);
//if new slot update slot related state data.
} else if new_slot.slot > self.current_slot.confirmed_slot {
//update epoch info
let mut diff = new_slot.slot - self.current_slot.confirmed_slot;
//First epoch slot, index is 0 so remove 1 from diff.
if self.first_epoch_slot {
//calculate next epoch data
self.current_epoch.epoch += 1;
//slot can be non consecutif, use diff.
self.current_epoch.slot_index = 0;
self.current_epoch.absolute_slot = new_slot.slot;
log::info!(
"change_epoch calculated next epoch:{:?} at slot:{}",
self.current_epoch,
new_slot.slot,
);
self.first_epoch_slot = false;
//slot_index start at 0.
diff -= 1; //diff is always >= 1
self.next_epoch_start_slot =
self.next_epoch_start_slot + self.current_epoch.slots_in_epoch;
//set to next epochs.
} else {
self.current_epoch.absolute_slot = new_slot.slot;
}
self.current_epoch.slot_index += diff;
log::trace!(
"Slot epoch with slot:{} , diff:{diff} current epoch state:{self:?}",
new_slot.slot
);
}
}
//update slot state for all commitment.
self.current_slot.update_slot(&new_slot); self.current_slot.update_slot(&new_slot);
if let GeyserCommitmentLevel::Confirmed = new_slot.status() { if let GeyserCommitmentLevel::Confirmed = new_slot.status() {
if self.current_epoch_value.epoch == 0 {
//init first epoch
self.current_epoch_value = self.epoch_cache.get_epoch_at_slot(new_slot.slot);
}
self.manage_change_epoch() self.manage_change_epoch()
} else { } else {
None None
@ -134,22 +152,17 @@ impl CurrentEpochSlotState {
} }
fn manage_change_epoch(&mut self) -> Option<LeaderScheduleEvent> { fn manage_change_epoch(&mut self) -> Option<LeaderScheduleEvent> {
//we change the epoch at the last slot of the current epoch. if self.current_slot.confirmed_slot > self.current_epoch_end_slot() {
if self.current_slot.confirmed_slot >= self.current_epoch_end_slot() { log::info!("Change epoch at slot:{}", self.current_slot.confirmed_slot);
log::info!(
"End epoch slot detected:{}",
self.current_slot.confirmed_slot
);
//set epoch change effectif at next slot.
self.first_epoch_slot = true;
self.current_epoch_value = self.get_current_epoch().get_next_epoch(&self);
//start leader schedule calculus //start leader schedule calculus
//switch to 2 next epoch to calculate schedule at next epoch.
//at current epoch change the schedule is calculated for the next epoch. //at current epoch change the schedule is calculated for the next epoch.
let schedule_epoch = crate::leader_schedule::next_schedule_epoch(&self.current_epoch); Some(crate::leader_schedule::create_schedule_init_event(
let schedule_epoch = crate::leader_schedule::next_schedule_epoch(&schedule_epoch); self.current_epoch_value.epoch,
Some(LeaderScheduleEvent::InitLeaderschedule(schedule_epoch)) self.get_slots_in_epoch(self.current_epoch_value.epoch),
Arc::clone(&self.epoch_cache.epoch_schedule),
))
} else { } else {
None None
} }

View File

@ -1,15 +1,19 @@
use crate::epoch::CurrentEpochSlotState;
use crate::epoch::Epoch;
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore}; use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
use crate::votestore::StoredVote;
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore}; use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
use anyhow::bail;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use solana_client::rpc_client::RpcClient; use solana_client::rpc_client::RpcClient;
use solana_ledger::leader_schedule::LeaderSchedule; use solana_ledger::leader_schedule::LeaderSchedule;
use solana_program::sysvar::epoch_schedule::EpochSchedule;
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS; use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo; use solana_sdk::feature_set::FeatureSet;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::StakeActivationStatus;
use solana_sdk::stake_history::StakeHistory; use solana_sdk::stake_history::StakeHistory;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::HashMap; use std::collections::HashMap;
@ -34,75 +38,69 @@ pub struct CalculatedSchedule {
#[derive(Debug)] #[derive(Debug)]
pub struct LeaderScheduleData { pub struct LeaderScheduleData {
pub schedule: Arc<HashMap<String, Vec<usize>>>, pub schedule: Arc<HashMap<String, Vec<usize>>>,
pub vote_stakes: Vec<(Pubkey, u64)>, pub vote_stakes: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
pub epoch: u64, pub epoch: u64,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct EpochStake { pub struct EpochStake {
epoch: u64, epoch: u64,
stakes: Vec<(Pubkey, u64)>, stake_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
} }
impl From<SavedStake> for EpochStake { impl From<SavedStake> for EpochStake {
fn from(saved_stakes: SavedStake) -> Self { fn from(saved_stakes: SavedStake) -> Self {
EpochStake { EpochStake {
epoch: saved_stakes.epoch, epoch: saved_stakes.epoch,
stakes: saved_stakes stake_vote_map: saved_stakes
.stakes .stake_vote_map
.into_iter() .into_iter()
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), st)) .map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), (st.0, st.1)))
.collect(), .collect(),
} }
} }
} }
pub fn next_schedule_epoch(current_epoch: &EpochInfo) -> EpochInfo { #[derive(Debug, Default, Clone, Serialize, Deserialize)]
let mut next_epoch_info = current_epoch.clone(); struct SavedStake {
next_epoch_info.epoch += 1; epoch: u64,
next_epoch_info.slot_index = 0; stake_vote_map: HashMap<String, (u64, Arc<StoredVote>)>,
next_epoch_info.absolute_slot =
current_epoch.absolute_slot + current_epoch.slots_in_epoch - current_epoch.slot_index;
next_epoch_info
} }
pub fn bootstrap_leader_schedule( pub fn bootstrap_leader_schedule(
current_file_patch: &str, current_file_patch: &str,
next_file_patch: &str, next_file_patch: &str,
slots_in_epoch: u64, epoch_state: &CurrentEpochSlotState,
) -> anyhow::Result<CalculatedSchedule> { ) -> anyhow::Result<CalculatedSchedule> {
let mut current_stakes = read_schedule_vote_stakes(current_file_patch)?; let current_stakes = read_schedule_vote_stakes(current_file_patch)?;
let mut next_stakes = read_schedule_vote_stakes(next_file_patch)?; let next_stakes = read_schedule_vote_stakes(next_file_patch)?;
//calcualte leader schedule for all vote stakes. //calcualte leader schedule for all vote stakes.
let current_schedule = calculate_leader_schedule( let current_schedule = calculate_leader_schedule(
&mut current_stakes.stakes, &current_stakes.stake_vote_map,
current_stakes.epoch, current_stakes.epoch,
slots_in_epoch, epoch_state.get_slots_in_epoch(current_stakes.epoch),
)?; );
let next_schedule = let next_schedule = calculate_leader_schedule(
calculate_leader_schedule(&mut next_stakes.stakes, next_stakes.epoch, slots_in_epoch)?; &next_stakes.stake_vote_map,
next_stakes.epoch,
epoch_state.get_slots_in_epoch(next_stakes.epoch),
);
Ok(CalculatedSchedule { Ok(CalculatedSchedule {
current: Some(LeaderScheduleData { current: Some(LeaderScheduleData {
schedule: Arc::new(current_schedule), schedule: Arc::new(current_schedule),
vote_stakes: current_stakes.stakes, vote_stakes: current_stakes.stake_vote_map,
epoch: current_stakes.epoch, epoch: epoch_state.get_slots_in_epoch(current_stakes.epoch),
}), }),
next: Some(LeaderScheduleData { next: Some(LeaderScheduleData {
schedule: Arc::new(next_schedule), schedule: Arc::new(next_schedule),
vote_stakes: next_stakes.stakes, vote_stakes: next_stakes.stake_vote_map,
epoch: next_stakes.epoch, epoch: epoch_state.get_slots_in_epoch(next_stakes.epoch),
}), }),
}) })
} }
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct SavedStake {
epoch: u64,
stakes: Vec<(String, u64)>,
}
fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> { fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
let content = std::fs::read_to_string(file_path)?; let content = std::fs::read_to_string(file_path)?;
let stakes_str: SavedStake = serde_json::from_str(&content)?; let stakes_str: SavedStake = serde_json::from_str(&content)?;
@ -112,16 +110,16 @@ fn read_schedule_vote_stakes(file_path: &str) -> anyhow::Result<EpochStake> {
fn save_schedule_vote_stakes( fn save_schedule_vote_stakes(
base_file_path: &str, base_file_path: &str,
stakes: &Vec<(Pubkey, u64)>, stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64, epoch: u64,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
//save new schedule for restart. //save new schedule for restart.
let filename = format!("{base_file_path}_{epoch}.json",); let filename = format!("{base_file_path}_{}.json", epoch);
let save_stakes = SavedStake { let save_stakes = SavedStake {
epoch, epoch,
stakes: stakes stake_vote_map: stake_vote_map
.iter() .iter()
.map(|(pk, st)| (pk.to_string(), *st)) .map(|(pk, st)| (pk.to_string(), (st.0, Arc::clone(&st.1))))
.collect(), .collect(),
}; };
let serialized_stakes = serde_json::to_string(&save_stakes).unwrap(); let serialized_stakes = serde_json::to_string(&save_stakes).unwrap();
@ -157,11 +155,8 @@ pub fn run_leader_schedule_events(
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>, bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
stakestore: &mut StakeStore, stakestore: &mut StakeStore,
votestore: &mut VoteStore, votestore: &mut VoteStore,
) -> Option<( ) -> Option<LeaderScheduleData> {
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>, let result = process_leadershedule_event(event, stakestore, votestore);
EpochInfo,
)> {
let result = process_leadershedule_event(rpc_url.clone(), event, stakestore, votestore);
match result { match result {
LeaderScheduleResult::TaskHandle(jh) => { LeaderScheduleResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh); bootstrap_tasks.push(jh);
@ -170,132 +165,155 @@ pub fn run_leader_schedule_events(
LeaderScheduleResult::Event(event) => { LeaderScheduleResult::Event(event) => {
run_leader_schedule_events(rpc_url, event, bootstrap_tasks, stakestore, votestore) run_leader_schedule_events(rpc_url, event, bootstrap_tasks, stakestore, votestore)
} }
LeaderScheduleResult::End(schedule, epoch) => Some((schedule, epoch)), LeaderScheduleResult::End(schedule) => Some(schedule),
} }
} }
// struct LeaderScheduleEpochData {
// current_epoch: u64,
// next_epoch: u64,
// slots_in_epoch: u64,
// epoch_schedule: Arc<EpochSchedule>,
// }
pub enum LeaderScheduleEvent { pub enum LeaderScheduleEvent {
InitLeaderschedule(EpochInfo), Init(u64, u64, Arc<EpochSchedule>),
CalculateScedule(StakeMap, VoteMap, EpochInfo, Option<StakeHistory>),
MergeStoreAndSaveSchedule( MergeStoreAndSaveSchedule(
StakeMap, StakeMap,
VoteMap, VoteMap,
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>, LeaderScheduleData,
EpochInfo, (u64, u64, Arc<EpochSchedule>),
Option<StakeHistory>,
), ),
} }
enum LeaderScheduleResult { enum LeaderScheduleResult {
TaskHandle(JoinHandle<LeaderScheduleEvent>), TaskHandle(JoinHandle<LeaderScheduleEvent>),
Event(LeaderScheduleEvent), Event(LeaderScheduleEvent),
End( End(LeaderScheduleData),
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>, }
EpochInfo,
), pub fn create_schedule_init_event(
current_epoch: u64,
slots_in_epoch: u64,
epoch_schedule: Arc<EpochSchedule>,
) -> LeaderScheduleEvent {
log::info!("LeaderScheduleEvent::InitLeaderschedule START");
//TODO get a way to be updated of stake history.
//request the current state using RPC.
//TODO remove the await from the scheduler task.
// let before_stake_histo = stake_history.clone();
// let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
// .map(|account| crate::stakestore::read_historystake_from_account(account))
// .unwrap_or_else(|_| {
// log::error!("Error during stake history fetch. Use bootstrap one");
// stake_history.map(|st| st.as_ref().clone())
// });
// log::info!(
// "stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
// epoch_data.current_epoch
// );
LeaderScheduleEvent::Init(current_epoch, slots_in_epoch, epoch_schedule)
} }
//TODO remove desactivated account after leader schedule calculus. //TODO remove desactivated account after leader schedule calculus.
fn process_leadershedule_event( fn process_leadershedule_event(
rpc_url: String, // rpc_url: String,
event: LeaderScheduleEvent, event: LeaderScheduleEvent,
stakestore: &mut StakeStore, stakestore: &mut StakeStore,
votestore: &mut VoteStore, votestore: &mut VoteStore,
) -> LeaderScheduleResult { ) -> LeaderScheduleResult {
match event { match event {
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch) => { LeaderScheduleEvent::Init(current_epoch, slots_in_epoch, epoch_schedule) => {
log::info!("LeaderScheduleEvent::InitLeaderschedule RECV");
//For test TODO put in extract and restore process to avoid to clone.
let stake_history = stakestore.get_cloned_stake_history();
//TODO get a way to be updated of stake history.
//request the current state using RPC.
//TODO remove the await from the scheduler task.
let before_stake_histo = stake_history.clone();
let stake_history = crate::bootstrap::get_stakehistory_account(rpc_url)
.map(|account| crate::stakestore::read_historystake_from_account(account))
.unwrap_or_else(|_| {
log::error!("Error during stake history fetch. Use bootstrap one");
stake_history
});
log::info!(
"stake_hsitorique epoch{} before:{before_stake_histo:?} after:{stake_history:?}",
schedule_epoch.epoch
);
match (extract_stakestore(stakestore), extract_votestore(votestore)) { match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => { (Ok((stake_map, mut stake_history)), Ok(vote_map)) => {
LeaderScheduleResult::Event(LeaderScheduleEvent::CalculateScedule( //For test TODO put in extract and restore process to avoid to clone.
stake_map, log::info!("LeaderScheduleEvent::CalculateScedule");
vote_map, let epoch_schedule = Arc::clone(&epoch_schedule);
schedule_epoch, let jh = tokio::task::spawn_blocking({
stake_history, move || {
)) let next_epoch = current_epoch + 1;
} let epoch_vote_stakes = calculate_epoch_stakes(
_ => { &stake_map,
log::warn!("process_leadershedule_event error during extract store"); &vote_map,
let jh = tokio::spawn(async move { current_epoch,
tokio::time::sleep(Duration::from_secs(1)).await; next_epoch,
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch) stake_history.as_mut(),
&epoch_schedule,
);
let leader_schedule = calculate_leader_schedule(
&epoch_vote_stakes,
next_epoch,
slots_in_epoch,
);
// let epoch_vote_stakes_old = calculate_epoch_stakes_old_algo(
// &stake_map,
// &vote_map,
// &epoch_state.next_epoch,
// stake_history.as_ref(),
// );
// let leader_schedule_old =
// calculate_leader_schedule(&epoch_vote_stakes_old, &next_epoch);
if let Err(err) = save_schedule_vote_stakes(
SCHEDULE_STAKE_BASE_FILE_NAME,
&epoch_vote_stakes,
next_epoch,
) {
log::error!(
"Error during saving in file of the new schedule of epoch:{} error:{err}",
next_epoch
);
}
log::info!("End calculate leader schedule");
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
LeaderScheduleData {
schedule: Arc::new(leader_schedule),
vote_stakes: epoch_vote_stakes,
epoch: next_epoch,
},
(current_epoch, slots_in_epoch, epoch_schedule),
stake_history,
)
}
}); });
LeaderScheduleResult::TaskHandle(jh) LeaderScheduleResult::TaskHandle(jh)
} }
} _ => {
} log::error!("Create leadershedule init event error during extract store");
LeaderScheduleEvent::CalculateScedule( LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
stake_map, current_epoch,
vote_map, slots_in_epoch,
schedule_epoch, epoch_schedule,
stake_history, ))
) => {
log::info!("LeaderScheduleEvent::CalculateScedule RECV");
let jh = tokio::task::spawn_blocking({
move || {
let schedule_and_stakes =
crate::leader_schedule::calculate_leader_schedule_from_stake_map(
&stake_map,
&vote_map,
&schedule_epoch,
stake_history.as_ref(),
);
if let Ok((_, vote_stakes)) = &schedule_and_stakes {
if let Err(err) = save_schedule_vote_stakes(
SCHEDULE_STAKE_BASE_FILE_NAME,
vote_stakes,
schedule_epoch.epoch,
) {
log::error!(
"Error during saving in file of the new schedule of epoch:{} error:{err}",
schedule_epoch.epoch
);
}
}
log::info!("End calculate leader schedule");
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
schedule_and_stakes,
schedule_epoch,
)
} }
}); }
LeaderScheduleResult::TaskHandle(jh)
} }
LeaderScheduleEvent::MergeStoreAndSaveSchedule( LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map, stake_map,
vote_map, vote_map,
schedule, schedule_data,
schedule_epoch, (current_epoch, slots_in_epoch, epoch_schedule),
stake_history,
) => { ) => {
log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV"); log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV");
match ( match (
merge_stakestore(stakestore, stake_map, schedule_epoch.epoch), merge_stakestore(stakestore, stake_map, stake_history),
merge_votestore(votestore, vote_map), merge_votestore(votestore, vote_map),
) { ) {
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule, schedule_epoch), (Ok(()), Ok(())) => LeaderScheduleResult::End(schedule_data),
_ => { _ => {
//this shoud never arrive because the store has been extracted before. //this shoud never arrive because the store has been extracted before.
//TODO remove this error using type state //TODO remove this error using type state
log::warn!("LeaderScheduleEvent::MergeStoreAndSaveSchedule merge stake or vote fail, -restart Schedule"); log::warn!("LeaderScheduleEvent::MergeStoreAndSaveSchedule merge stake or vote fail, -restart Schedule");
LeaderScheduleResult::Event(LeaderScheduleEvent::InitLeaderschedule( LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
schedule_epoch, current_epoch,
slots_in_epoch,
epoch_schedule,
)) ))
} }
} }
@ -303,65 +321,68 @@ fn process_leadershedule_event(
} }
} }
// fn calculate_epoch_stakes(stake_map: &StakeMap) { fn calculate_epoch_stakes(
// let stake_history_entry = stake_map: &StakeMap,
vote_map: &VoteMap,
current_epoch: u64,
next_epoch: u64,
mut stake_history: Option<&mut StakeHistory>,
epoch_schedule: &EpochSchedule,
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
//update stake history with current end epoch stake values.
let new_rate_activation_epoch =
FeatureSet::default().new_warmup_cooldown_rate_epoch(epoch_schedule);
let stake_history_entry =
stake_map
.values()
.fold(StakeActivationStatus::default(), |acc, stake_account| {
let delegation = stake_account.stake;
acc + delegation.stake_activating_and_deactivating(
current_epoch,
stake_history.as_deref(),
new_rate_activation_epoch,
)
});
match stake_history {
Some(ref mut stake_history) => stake_history.add(current_epoch, stake_history_entry),
None => log::warn!("Vote stake calculus without Stake History"),
};
// let stake_delegations: Vec<_> = self.stake_delegations.values().collect(); //calculate schedule stakes.
// // Wrap up the prev epoch by adding new stake history entry for the let delegated_stakes: HashMap<Pubkey, u64> =
// // prev epoch. stake_map
// let stake_history_entry = thread_pool.install(|| { .values()
// stake_delegations .fold(HashMap::default(), |mut delegated_stakes, stake_account| {
// .par_iter() let delegation = stake_account.stake;
// .fold(StakeActivationStatus::default, |acc, stake_account| { let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
// let delegation = stake_account.delegation(); *entry += delegation.stake(
// acc + delegation.stake_activating_and_deactivating( next_epoch,
// self.epoch, stake_history.as_deref(),
// Some(&self.stake_history), new_rate_activation_epoch,
// new_rate_activation_epoch, );
// ) delegated_stakes
// }) });
// .reduce(StakeActivationStatus::default, Add::add)
// });
// self.stake_history.add(self.epoch, stake_history_entry);
// self.epoch = next_epoch;
// // Refresh the stake distribution of vote accounts for the next epoch,
// // using new stake history.
// let delegated_stakes = thread_pool.install(|| {
// stake_delegations
// .par_iter()
// .fold(HashMap::default, |mut delegated_stakes, stake_account| {
// let delegation = stake_account.delegation();
// let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
// *entry += delegation.stake(
// self.epoch,
// Some(&self.stake_history),
// new_rate_activation_epoch,
// );
// delegated_stakes
// })
// .reduce(HashMap::default, merge)
// });
// self.vote_accounts = self
// .vote_accounts
// .iter()
// .map(|(&vote_pubkey, vote_account)| {
// let delegated_stake = delegated_stakes
// .get(&vote_pubkey)
// .copied()
// .unwrap_or_default();
// (vote_pubkey, (delegated_stake, vote_account.clone()))
// })
// .collect();
// } let staked_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)> = vote_map
.values()
.map(|vote_account| {
let delegated_stake = delegated_stakes
.get(&vote_account.pubkey)
.copied()
.unwrap_or_default();
(vote_account.pubkey, (delegated_stake, vote_account.clone()))
})
.collect();
staked_vote_map
}
fn calculate_leader_schedule_from_stake_map( fn calculate_epoch_stakes_old_algo(
stake_map: &crate::stakestore::StakeMap, stake_map: &StakeMap,
vote_map: &crate::votestore::VoteMap, vote_map: &VoteMap,
current_epoch_info: &EpochInfo, next_epoch: &Epoch,
stake_history: Option<&StakeHistory>, stake_history: Option<&StakeHistory>,
) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> { ) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
let mut stakes = HashMap::<Pubkey, u64>::new(); let mut stakes = HashMap::<Pubkey, (u64, Arc<StoredVote>)>::new();
log::info!( log::info!(
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}", "calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
vote_map.len(), vote_map.len(),
@ -369,7 +390,7 @@ fn calculate_leader_schedule_from_stake_map(
stake_history.as_ref().map(|h| h.len()) stake_history.as_ref().map(|h| h.len())
); );
let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch; let ten_epoch_slot_long = 10 * next_epoch.slots_in_epoch;
//log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}"); //log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
for storestake in stake_map.values() { for storestake in stake_map.values() {
@ -392,47 +413,149 @@ fn calculate_leader_schedule_from_stake_map(
"leader schedule vote:{} with None root_slot, add it", "leader schedule vote:{} with None root_slot, add it",
vote_account.pubkey vote_account.pubkey
); );
current_epoch_info.absolute_slot next_epoch.absolute_slot
}) < current_epoch_info }) < next_epoch.absolute_slot.saturating_sub(ten_epoch_slot_long)
.absolute_slot
.saturating_sub(ten_epoch_slot_long)
{ {
log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule." log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
, storestake.stake.voter_pubkey , storestake.stake.voter_pubkey
,vote_account.vote_data.node_pubkey ,vote_account.vote_data.node_pubkey
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature. //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
, storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)), , storestake.stake.stake(next_epoch.epoch, stake_history, Some(0)),
); );
} else { } else {
let effective_stake = storestake let effective_stake = storestake
.stake .stake
//TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature. //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
.stake(current_epoch_info.epoch, stake_history, Some(0)); .stake(next_epoch.epoch, stake_history, Some(0));
//only vote account with positive stake are use for the schedule.
if effective_stake > 0 { stakes
*(stakes .entry(vote_account.pubkey)
.entry(vote_account.vote_data.node_pubkey) .or_insert((0, vote_account.clone()))
.or_insert(0)) += effective_stake; .0 += effective_stake;
} else {
log::info!( // //only vote account with positive stake are use for the schedule.
"leader schedule vote:{} with 0 effective vote", // if effective_stake > 0 {
storestake.stake.voter_pubkey // *(stakes
); // .entry(vote_account.vote_data.node_pubkey)
} // .or_insert(0)) += effective_stake;
// } else {
// log::info!(
// "leader schedule vote:{} with 0 effective vote",
// storestake.stake.voter_pubkey
// );
// }
} }
} }
stakes
let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
schedule_stakes.extend(stakes.drain());
let leader_schedule = calculate_leader_schedule(
&mut schedule_stakes,
current_epoch_info.epoch,
current_epoch_info.slots_in_epoch,
)?;
Ok((leader_schedule, schedule_stakes))
} }
//Copied from leader_schedule_utils.rs
// Mostly cribbed from leader_schedule_utils
fn calculate_leader_schedule(
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64,
slots_in_epoch: u64,
) -> HashMap<String, Vec<usize>> {
let mut stakes: Vec<(Pubkey, u64)> = stake_vote_map
.iter()
.filter_map(|(pk, (stake, _))| (*stake > 0).then_some((*pk, *stake)))
.collect();
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
sort_stakes(&mut stakes);
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
let slot_schedule = schedule
.get_slot_leaders()
.iter()
.enumerate()
.map(|(i, pk)| (pk.to_string(), i))
.into_group_map()
.into_iter()
.collect();
slot_schedule
}
// fn calculate_leader_schedule_from_stake_map(
// stake_map: &crate::stakestore::StakeMap,
// vote_map: &crate::votestore::VoteMap,
// current_epoch_info: &EpochInfo,
// stake_history: Option<&StakeHistory>,
// ) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
// let mut stakes = HashMap::<Pubkey, u64>::new();
// log::info!(
// "calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{} history len:{:?}",
// vote_map.len(),
// stake_map.len(),
// stake_history.as_ref().map(|h| h.len())
// );
// let ten_epoch_slot_long = 10 * current_epoch_info.slots_in_epoch;
// //log::trace!("calculate_leader_schedule_from_stake_map stake_map:{stake_map:?} current_epoch_info:{current_epoch_info:?}");
// for storestake in stake_map.values() {
// //get nodeid for vote account
// let Some(vote_account) = vote_map.get(&storestake.stake.voter_pubkey) else {
// log::info!(
// "Vote account not found in vote map for stake vote account:{}",
// &storestake.stake.voter_pubkey
// );
// continue;
// };
// //TODO validate the number of epoch.
// //remove vote account that hasn't vote since 10 epoch.
// //on testnet the vote account CY7gjryUPV6Pwbsn4aArkMBL7HSaRHB8sPZUvhw558Tm node_id:6YpwLjgXcMWAj29govWQr87kaAGKS7CnoqWsEDJE4h8P
// //hasn't vote since a long time but still return on RPC call get_voteaccounts.
// //the validator don't use it for leader schedule.
// if vote_account.vote_data.root_slot.unwrap_or_else(|| {
// //vote with no root_slot are added. They have just been activated and can have active stake- TODO TO TEST.
// log::info!(
// "leader schedule vote:{} with None root_slot, add it",
// vote_account.pubkey
// );
// current_epoch_info.absolute_slot
// }) < current_epoch_info
// .absolute_slot
// .saturating_sub(ten_epoch_slot_long)
// {
// log::info!("Vote account:{} nodeid:{} that hasn't vote since 10 epochs. Stake for account:{:?}. Remove leader_schedule."
// , storestake.stake.voter_pubkey
// ,vote_account.vote_data.node_pubkey
// //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
// , storestake.stake.stake(current_epoch_info.epoch, stake_history, Some(0)),
// );
// } else {
// let effective_stake = storestake
// .stake
// //TODO us the right reduce_stake_warmup_cooldown_epoch value from validator feature.
// .stake(current_epoch_info.epoch, stake_history, Some(0));
// //only vote account with positive stake are use for the schedule.
// if effective_stake > 0 {
// *(stakes
// .entry(vote_account.vote_data.node_pubkey)
// .or_insert(0)) += effective_stake;
// } else {
// log::info!(
// "leader schedule vote:{} with 0 effective vote",
// storestake.stake.voter_pubkey
// );
// }
// }
// }
// let mut schedule_stakes: Vec<(Pubkey, u64)> = vec![];
// schedule_stakes.extend(stakes.drain());
// let leader_schedule = calculate_leader_schedule_old(
// &mut schedule_stakes,
// current_epoch_info.epoch,
// current_epoch_info.slots_in_epoch,
// )?;
// Ok((leader_schedule, schedule_stakes))
// }
// fn is_stake_to_add( // fn is_stake_to_add(
// stake_pubkey: Pubkey, // stake_pubkey: Pubkey,
// stake: &Delegation, // stake: &Delegation,
@ -461,30 +584,30 @@ fn calculate_leader_schedule_from_stake_map(
//Copied from leader_schedule_utils.rs //Copied from leader_schedule_utils.rs
// Mostly cribbed from leader_schedule_utils // Mostly cribbed from leader_schedule_utils
fn calculate_leader_schedule( // fn calculate_leader_schedule_old(
stakes: &mut Vec<(Pubkey, u64)>, // stakes: &mut Vec<(Pubkey, u64)>,
epoch: u64, // epoch: u64,
slots_in_epoch: u64, // slots_in_epoch: u64,
) -> anyhow::Result<HashMap<String, Vec<usize>>> { // ) -> anyhow::Result<HashMap<String, Vec<usize>>> {
if stakes.is_empty() { // if stakes.is_empty() {
bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated."); // bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
} // }
let mut seed = [0u8; 32]; // let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes()); // seed[0..8].copy_from_slice(&epoch.to_le_bytes());
sort_stakes(stakes); // sort_stakes(stakes);
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}"); // log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}");
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS); // let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
let slot_schedule = schedule // let slot_schedule = schedule
.get_slot_leaders() // .get_slot_leaders()
.iter() // .iter()
.enumerate() // .enumerate()
.map(|(i, pk)| (pk.to_string(), i)) // .map(|(i, pk)| (pk.to_string(), i))
.into_group_map() // .into_group_map()
.into_iter() // .into_iter()
.collect(); // .collect();
Ok(slot_schedule) // Ok(slot_schedule)
} // }
// Cribbed from leader_schedule_utils // Cribbed from leader_schedule_utils
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
@ -603,7 +726,7 @@ use solana_sdk::stake::state::StakeState;
pub fn build_current_stakes( pub fn build_current_stakes(
stake_map: &crate::stakestore::StakeMap, stake_map: &crate::stakestore::StakeMap,
stake_history: Option<&StakeHistory>, stake_history: Option<&StakeHistory>,
current_epoch_info: &EpochInfo, current_epoch: u64,
rpc_url: String, rpc_url: String,
commitment: CommitmentConfig, commitment: CommitmentConfig,
) -> BTreeMap<String, (u64, u64)> { ) -> BTreeMap<String, (u64, u64)> {
@ -628,10 +751,9 @@ pub fn build_current_stakes(
match BorshDeserialize::deserialize(&mut account.data.as_slice()).unwrap() { match BorshDeserialize::deserialize(&mut account.data.as_slice()).unwrap() {
StakeState::Stake(_, stake) => { StakeState::Stake(_, stake) => {
//vote account version //vote account version
let effective_stake = let effective_stake = stake
stake .delegation
.delegation .stake(current_epoch, stake_history, Some(0));
.stake(current_epoch_info.epoch, stake_history, Some(0));
if effective_stake > 0 { if effective_stake > 0 {
// Add the stake in this stake account to the total for the delegated-to vote account // Add the stake in this stake account to the total for the delegated-to vote account
log::trace!("RPC Stake {pubkey} account:{account:?} stake:{stake:?}"); log::trace!("RPC Stake {pubkey} account:{account:?} stake:{stake:?}");
@ -657,10 +779,7 @@ pub fn build_current_stakes(
stake_map stake_map
.iter() .iter()
.filter_map(|(pubkey, stake)| { .filter_map(|(pubkey, stake)| {
let effective_stake = let effective_stake = stake.stake.stake(current_epoch, stake_history, Some(0));
stake
.stake
.stake(current_epoch_info.epoch, stake_history, Some(0));
(effective_stake > 0).then(|| (pubkey, stake, effective_stake)) (effective_stake > 0).then(|| (pubkey, stake, effective_stake))
}) })
.for_each(|(pubkey, stake, effective_stake)| { .for_each(|(pubkey, stake, effective_stake)| {

View File

@ -48,7 +48,6 @@ use solana_sdk::stake::state::Delegation;
use solana_sdk::vote::state::VoteState; use solana_sdk::vote::state::VoteState;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::sync::Arc;
use tokio::time::Duration; use tokio::time::Duration;
use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::CommitmentLevel; use yellowstone_grpc_proto::geyser::CommitmentLevel;
@ -169,7 +168,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let mut leader_schedule_data = match crate::leader_schedule::bootstrap_leader_schedule( let mut leader_schedule_data = match crate::leader_schedule::bootstrap_leader_schedule(
&schedule_current_file, &schedule_current_file,
&schedule_next_file, &schedule_next_file,
current_epoch_state.current_epoch.slots_in_epoch, &current_epoch_state,
) { ) {
Ok(data) => data, Ok(data) => data,
Err(err) => { Err(err) => {
@ -246,8 +245,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//Init bootstrap process //Init bootstrap process
let mut bootstrap_data = BootstrapData { let mut bootstrap_data = BootstrapData {
done: false, done: false,
current_epoch: current_epoch_state.current_epoch.epoch,
next_epoch_start_slot: current_epoch_state.next_epoch_start_slot,
sleep_time: 1, sleep_time: 1,
rpc_url: RPC_URL.to_string(), rpc_url: RPC_URL.to_string(),
}; };
@ -261,7 +258,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//TODO remove. Store parent hash to see if we don't miss a block. //TODO remove. Store parent hash to see if we don't miss a block.
let mut parent_block_slot = None; let mut parent_block_slot = None;
loop { loop {
tokio::select! { tokio::select! {
Some(req) = request_rx.recv() => { Some(req) = request_rx.recv() => {
@ -270,13 +266,13 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
tokio::task::spawn_blocking({ tokio::task::spawn_blocking({
log::info!("RPC start save_stakes"); log::info!("RPC start save_stakes");
let current_stakes = stakestore.get_cloned_stake_map(); let current_stakes = stakestore.get_cloned_stake_map();
let history = stakestore.get_cloned_stake_history(); let move_epoch = current_epoch_state.get_current_epoch();
let move_epoch = current_epoch_state.current_epoch.clone(); let stake_history = stakestore.get_stake_history();
move || { move || {
let current_stake = crate::leader_schedule::build_current_stakes( let current_stake = crate::leader_schedule::build_current_stakes(
&current_stakes, &current_stakes,
history.as_ref(), stake_history.as_ref(),
&move_epoch, move_epoch.epoch,
RPC_URL.to_string(), RPC_URL.to_string(),
CommitmentConfig::confirmed(), CommitmentConfig::confirmed(),
); );
@ -332,45 +328,15 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
} }
//Manage RPC call result execution //Manage RPC call result execution
Some(Ok(event)) = spawned_leader_schedule_task.next() => { Some(Ok(event)) = spawned_leader_schedule_task.next() => {
if let Some((new_schedule, epoch)) = crate::leader_schedule::run_leader_schedule_events( let new_leader_schedule = crate::leader_schedule::run_leader_schedule_events(
RPC_URL.to_string(), RPC_URL.to_string(),
event, event,
&mut spawned_leader_schedule_task, &mut spawned_leader_schedule_task,
&mut stakestore, &mut stakestore,
&mut votestore, &mut votestore,
) { );
leader_schedule_data.current = leader_schedule_data.next.take();
leader_schedule_data.current = leader_schedule_data.next.take(); leader_schedule_data.next = new_leader_schedule;
match new_schedule {
Ok((new_schedule, new_stakes)) => {
leader_schedule_data.next = Some(LeaderScheduleData {
schedule: Arc::new(new_schedule),
vote_stakes: new_stakes,
epoch: epoch.epoch,
});
},
Err(err) => {
log::error!("Error during new leader schedule generation:{err}");
log::error!("No leader schedule available for next epoch.");
}
}
//TODO remove verification when schedule ok.
//verify calculated shedule with the one the RPC return.
// if let Some(schedule) = new_schedule {
// tokio::task::spawn_blocking(|| {
// //10 second that the schedule has been calculated on the validator
// std::thread::sleep(std::time::Duration::from_secs(5));
// log::info!("Start Verify schedule");
// if let Err(err) = crate::leader_schedule::verify_schedule(schedule,RPC_URL.to_string()) {
// log::warn!("Error during schedule verification:{err}");
// }
// log::info!("End Verify schedule");
// });
// }
}
} }
//get confirmed slot or account //get confirmed slot or account
@ -392,7 +358,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
if let Err(err) = stakestore.notify_stake_change( if let Err(err) = stakestore.notify_stake_change(
account, account,
current_epoch_state.current_epoch_end_slot(), current_epoch_state.current_epoch_end_slot(),
current_epoch_state.current_epoch.epoch,
) { ) {
log::warn!("Can't add new stake from account data err:{}", err); log::warn!("Can't add new stake from account data err:{}", err);
continue; continue;

View File

@ -22,7 +22,7 @@ use thiserror::Error;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot; use tokio::sync::oneshot;
const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3000"; const PRIVATE_RPC_ADDRESS: &str = "0.0.0.0:3001";
const SERVER_ERROR_MSG: &str = "Internal server error"; const SERVER_ERROR_MSG: &str = "Internal server error";
//internal RPC access //internal RPC access
@ -88,7 +88,10 @@ pub struct RPCServer {
#[jsonrpsee::core::async_trait] #[jsonrpsee::core::async_trait]
impl ConsensusRpcServer for RPCServer { impl ConsensusRpcServer for RPCServer {
async fn get_latest_blockhash(&self, config: Option<RpcContextConfig>) -> Result<RpcBlockhash> { async fn get_latest_blockhash(
&self,
_config: Option<RpcContextConfig>,
) -> Result<RpcBlockhash> {
todo!() todo!()
} }
@ -137,7 +140,11 @@ pub fn server_rpc_request(
) { ) {
match request { match request {
crate::rpc::Requests::EpochInfo(tx) => { crate::rpc::Requests::EpochInfo(tx) => {
if let Err(err) = tx.send(current_epoch_state.current_epoch.clone()) { if let Err(err) = tx.send(
current_epoch_state
.get_current_epoch()
.into_epoch_info(0, None),
) {
log::warn!("Channel error during sending back request status error:{err:?}"); log::warn!("Channel error during sending back request status error:{err:?}");
} }
} }
@ -158,11 +165,11 @@ pub fn server_rpc_request(
log::info!( log::info!(
"Requests::LeaderSchedule slot:{slot} epoch:{epoch} current epoch:{}", "Requests::LeaderSchedule slot:{slot} epoch:{epoch} current epoch:{}",
current_epoch_state.current_epoch.epoch current_epoch_state.get_current_epoch().epoch
); );
//currently only return schedule for current of next epoch. //currently only return schedule for current of next epoch.
let get_schedule_fn = |schedule: &LeaderScheduleData| { let get_schedule_fn = |schedule: &LeaderScheduleData| {
(schedule.epoch == epoch).then(|| schedule.schedule.clone()) //Arc clone. (schedule.epoch == epoch).then(|| schedule.schedule.clone()) //clone the schedule. TODO clone arc.
}; };
let schedule = leader_schedules let schedule = leader_schedules
.current .current
@ -285,7 +292,7 @@ pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHan
Err("Error stake store extracted".to_string()) Err("Error stake store extracted".to_string())
} else { } else {
//replace pubkey with String. Json only allow distionary key with string. //replace pubkey with String. Json only allow distionary key with string.
let ret: HashMap<String, StoredVote> = accounts let ret: HashMap<String, Arc<StoredVote>> = accounts
.into_iter() .into_iter()
.map(|(pk, acc)| (pk.to_string(), acc)) .map(|(pk, acc)| (pk.to_string(), acc))
.collect(); .collect();

View File

@ -12,13 +12,14 @@ use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState; use solana_sdk::stake::state::StakeState;
use solana_sdk::stake_history::StakeHistory; use solana_sdk::stake_history::StakeHistory;
use std::collections::HashMap; use std::collections::HashMap;
use std::str::FromStr;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction; use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction;
pub type StakeMap = HashMap<Pubkey, StoredStake>; pub type StakeMap = HashMap<Pubkey, StoredStake>;
pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMap> { pub fn extract_stakestore(
stakestore: &mut StakeStore,
) -> anyhow::Result<(StakeMap, Option<StakeHistory>)> {
let new_store = std::mem::take(stakestore); let new_store = std::mem::take(stakestore);
let (new_store, stake_map) = new_store.extract()?; let (new_store, stake_map) = new_store.extract()?;
*stakestore = new_store; *stakestore = new_store;
@ -28,21 +29,15 @@ pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMa
pub fn merge_stakestore( pub fn merge_stakestore(
stakestore: &mut StakeStore, stakestore: &mut StakeStore,
stake_map: StakeMap, stake_map: StakeMap,
current_epoch: u64, stake_history: Option<StakeHistory>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let new_store = std::mem::take(stakestore); let new_store = std::mem::take(stakestore);
let new_store = new_store.merge_stakes(stake_map, current_epoch)?; let new_store = new_store.merge_stakes(stake_map, stake_history)?;
*stakestore = new_store; *stakestore = new_store;
Ok(()) Ok(())
} }
fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch: u64) { fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake) {
//don't add stake that are already desactivated.
//there's some stran,ge stake that has activeate_epock = max epoch and desactivate ecpock 90 that are taken into account.
//Must be better defined.
// if stake.stake.deactivation_epoch < current_epoch {
// return;
// }
//log::info!("stake_map_notify_stake stake:{stake:?}"); //log::info!("stake_map_notify_stake stake:{stake:?}");
match map.entry(stake.pubkey) { match map.entry(stake.pubkey) {
// If value already exists, then increment it by one // If value already exists, then increment it by one
@ -70,7 +65,6 @@ fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch:
pub enum ExtractedAction { pub enum ExtractedAction {
Notify { Notify {
stake: StoredStake, stake: StoredStake,
current_epoch: u64,
}, },
Remove(Pubkey, Slot), Remove(Pubkey, Slot),
Merge { Merge {
@ -85,18 +79,12 @@ pub enum ExtractedAction {
impl ExtractedAction { impl ExtractedAction {
fn get_update_slot(&self) -> u64 { fn get_update_slot(&self) -> u64 {
match self { match self {
ExtractedAction::Notify { stake, .. } => stake.last_update_slot, ExtractedAction::Notify { stake } => stake.last_update_slot,
ExtractedAction::Remove(_, slot) => *slot, ExtractedAction::Remove(_, slot) => *slot,
ExtractedAction::Merge { update_slot, .. } => *update_slot, ExtractedAction::Merge { update_slot, .. } => *update_slot,
ExtractedAction::None => 0, ExtractedAction::None => 0,
} }
} }
fn update_epoch(&mut self, epoch: u64) {
if let ExtractedAction::Notify { current_epoch, .. } = self {
*current_epoch = epoch;
}
}
} }
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
@ -108,16 +96,6 @@ pub struct StoredStake {
pub write_version: u64, pub write_version: u64,
} }
// impl StoredStake {
// fn is_removed(&self, current_epoch: u64) -> bool {
// self.stake.activation_epoch != crate::leader_schedule::MAX_EPOCH_VALUE
// && self.stake.deactivation_epoch < current_epoch
// }
// fn is_inserted(&self, current_epoch: u64) -> bool {
// self.stake.activation_epoch == crate::leader_schedule::MAX_EPOCH_VALUE
// || self.stake.deactivation_epoch >= current_epoch
// }
// }
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct StakeStore { pub struct StakeStore {
stakes: StakeMap, stakes: StakeMap,
@ -136,8 +114,12 @@ impl StakeStore {
} }
} }
pub fn set_stake_history(&mut self, stake_history: StakeHistory) { // pub fn set_stake_history(&mut self, stake_history: StakeHistory) {
self.stake_history = Some(stake_history); // self.stake_history = Some(Arc::new(stake_history));
// }
pub fn get_stake_history(&self) -> Option<StakeHistory> {
self.stake_history.clone()
} }
pub fn nb_stake_account(&self) -> usize { pub fn nb_stake_account(&self) -> usize {
@ -147,48 +129,34 @@ impl StakeStore {
pub fn get_cloned_stake_map(&self) -> StakeMap { pub fn get_cloned_stake_map(&self) -> StakeMap {
self.stakes.clone() self.stakes.clone()
} }
pub fn get_cloned_stake_history(&self) -> Option<StakeHistory> {
self.stake_history.clone()
}
pub fn notify_stake_change( pub fn notify_stake_change(
&mut self, &mut self,
new_account: AccountPretty, account: AccountPretty,
current_end_epoch_slot: Slot, current_end_epoch_slot: Slot,
current_epoch: u64,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let Ok(delegated_stake_opt) = new_account.read_stake() else { //if lamport == 0 the account has been removed.
bail!("Can't read stake from account data"); if account.lamports == 0 {
};
if let Some(delegated_stake) = delegated_stake_opt {
let stake = StoredStake {
pubkey: new_account.pubkey,
lamports: new_account.lamports,
stake: delegated_stake,
last_update_slot: new_account.slot,
write_version: new_account.write_version,
};
self.notify_stake_action( self.notify_stake_action(
ExtractedAction::Notify { ExtractedAction::Remove(account.pubkey, account.slot),
stake,
current_epoch,
},
current_end_epoch_slot, current_end_epoch_slot,
); );
} else {
let Ok(delegated_stake_opt) = account.read_stake() else {
bail!("Can't read stake from account data");
};
//during extract push the new update or if let Some(delegated_stake) = delegated_stake_opt {
//don't insertnow account change that has been done in next epoch. let stake = StoredStake {
//put in update pool to be merged next epoch change. pubkey: account.pubkey,
// let insert_stake = !self.extracted || ststake.last_update_slot > current_end_epoch_slot; lamports: account.lamports,
// match insert_stake { stake: delegated_stake,
// false => self.updates.push(ExtractedAction::Notify { last_update_slot: account.slot,
// stake_account: new_account.pubkey, write_version: account.write_version,
// stake: ststake, };
// }),
// true => self.notify_stake(new_account.pubkey, ststake, current_epoch), self.notify_stake_action(ExtractedAction::Notify { stake }, current_end_epoch_slot);
// } }
} }
Ok(()) Ok(())
@ -206,11 +174,8 @@ impl StakeStore {
fn process_stake_action(&mut self, action: ExtractedAction) { fn process_stake_action(&mut self, action: ExtractedAction) {
match action { match action {
ExtractedAction::Notify { ExtractedAction::Notify { stake } => {
stake, stake_map_notify_stake(&mut self.stakes, stake);
current_epoch,
} => {
stake_map_notify_stake(&mut self.stakes, stake, current_epoch);
} }
ExtractedAction::Remove(account_pk, slot) => self.remove_from_store(&account_pk, slot), ExtractedAction::Remove(account_pk, slot) => self.remove_from_store(&account_pk, slot),
//not use currently. TODO remove. //not use currently. TODO remove.
@ -229,27 +194,31 @@ impl StakeStore {
//return the contained stake map to do an external update. //return the contained stake map to do an external update.
// During extract period (between extract and merge) added stake a stored to be processed later. // During extract period (between extract and merge) added stake a stored to be processed later.
//if the store is already extracted return an error. //if the store is already extracted return an error.
fn extract(self) -> anyhow::Result<(Self, StakeMap)> { fn extract(self) -> anyhow::Result<(Self, (StakeMap, Option<StakeHistory>))> {
if self.extracted { if self.extracted {
bail!("StakeStore already extracted. Try later"); bail!("StakeStore already extracted. Try later");
} }
let stakestore = StakeStore { let stakestore = StakeStore {
stakes: HashMap::new(), stakes: HashMap::new(),
stake_history: self.stake_history, stake_history: None,
updates: self.updates, updates: self.updates,
extracted: true, extracted: true,
}; };
Ok((stakestore, self.stakes)) Ok((stakestore, (self.stakes, self.stake_history)))
} }
//Merge the stake map an,d replace the current one. //Merge the stake map an,d replace the current one.
fn merge_stakes(mut self, stakes: StakeMap, current_epoch: u64) -> anyhow::Result<Self> { fn merge_stakes(
mut self,
stakes: StakeMap,
stake_history: Option<StakeHistory>,
) -> anyhow::Result<Self> {
if !self.extracted { if !self.extracted {
bail!("StakeStore merge of non extracted map. Try later"); bail!("StakeStore merge of non extracted map. Try later");
} }
let mut stakestore = StakeStore { let mut stakestore = StakeStore {
stakes, stakes,
stake_history: self.stake_history, stake_history,
updates: vec![], updates: vec![],
extracted: false, extracted: false,
}; };
@ -257,25 +226,9 @@ impl StakeStore {
self.updates self.updates
.sort_unstable_by(|a, b| a.get_update_slot().cmp(&b.get_update_slot())); .sort_unstable_by(|a, b| a.get_update_slot().cmp(&b.get_update_slot()));
//apply stake added during extraction. //apply stake added during extraction.
for mut action in self.updates { for action in self.updates {
action.update_epoch(current_epoch);
stakestore.process_stake_action(action); stakestore.process_stake_action(action);
} }
//verify one stake account to test. TODO remove
let stake_account = stakestore
.stakes
.get(&Pubkey::from_str("2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce").unwrap());
let stake = stake_account.map(|stake| {
stake
.stake
.stake(current_epoch, stakestore.stake_history.as_ref(), Some(0))
});
log::info!(
"merge_stakes 2wAVZS68P6frWqpwMu7q67A3j54RFmBjq4oH94sYi7ce:{:?}",
stake
);
Ok(stakestore) Ok(stakestore)
} }
@ -286,7 +239,7 @@ impl StakeStore {
.map(|stake| stake.last_update_slot <= update_slot) .map(|stake| stake.last_update_slot <= update_slot)
.unwrap_or(true) .unwrap_or(true)
{ {
log::info!("remove_from_store for {}", account_pk.to_string()); log::info!("Stake remove_from_store for {}", account_pk.to_string());
self.stakes.remove(account_pk); self.stakes.remove(account_pk);
} }
} }
@ -296,7 +249,6 @@ pub fn merge_program_account_in_strake_map(
stake_map: &mut StakeMap, stake_map: &mut StakeMap,
stakes_list: Vec<(Pubkey, Account)>, stakes_list: Vec<(Pubkey, Account)>,
last_update_slot: Slot, last_update_slot: Slot,
current_epoch: u64,
) { ) {
stakes_list stakes_list
.into_iter() .into_iter()
@ -318,7 +270,7 @@ pub fn merge_program_account_in_strake_map(
write_version: 0, write_version: 0,
}; };
stake_map_notify_stake(stake_map, stake, current_epoch); stake_map_notify_stake(stake_map, stake);
}); });
} }
@ -764,21 +716,21 @@ pub async fn process_stake_tx_message(
current_end_epoch_slot, current_end_epoch_slot,
); );
send_verification( // send_verification(
stake_sender, // stake_sender,
stakestore, // stakestore,
"Merge Destination", // "Merge Destination",
account_keys[instruction.accounts[0] as usize], // account_keys[instruction.accounts[0] as usize],
) // )
.await; // .await;
send_verification( // send_verification(
stake_sender, // stake_sender,
stakestore, // stakestore,
"Merge Source", // "Merge Source",
account_keys[instruction.accounts[1] as usize], // account_keys[instruction.accounts[1] as usize],
) // )
.await; // .await;
} }
} }
} }

View File

@ -6,8 +6,9 @@ use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::vote::state::VoteState; use solana_sdk::vote::state::VoteState;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
pub type VoteMap = HashMap<Pubkey, StoredVote>; pub type VoteMap = HashMap<Pubkey, Arc<StoredVote>>;
//Copy the solana_rpc_client_api::response::RpcVoteAccountInfo struct //Copy the solana_rpc_client_api::response::RpcVoteAccountInfo struct
//to avoid to add a dep to solana_rpc_client that create //to avoid to add a dep to solana_rpc_client that create
@ -104,32 +105,50 @@ impl VoteStore {
new_account: AccountPretty, new_account: AccountPretty,
current_end_epoch_slot: Slot, current_end_epoch_slot: Slot,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let Ok(vote_data) = new_account.read_vote() else { if new_account.lamports == 0 {
bail!("Can't read Vote from account data"); self.remove_from_store(&new_account.pubkey, new_account.slot);
}; } else {
let Ok(vote_data) = new_account.read_vote() else {
bail!("Can't read Vote from account data");
};
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey); //log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
let new_voteacc = StoredVote { let new_voteacc = StoredVote {
pubkey: new_account.pubkey, pubkey: new_account.pubkey,
vote_data, vote_data,
last_update_slot: new_account.slot, last_update_slot: new_account.slot,
write_version: new_account.write_version, write_version: new_account.write_version,
}; };
//during extract push the new update or //during extract push the new update or
//don't insertnow account change that has been done in next epoch. //don't insertnow account change that has been done in next epoch.
//put in update pool to be merged next epoch change. //put in update pool to be merged next epoch change.
let insert_stake = !self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot; let insert_stake =
match insert_stake { !self.extracted || new_voteacc.last_update_slot > current_end_epoch_slot;
false => self.updates.push((new_account.pubkey, new_voteacc)), match insert_stake {
true => self.insert_vote(new_account.pubkey, new_voteacc), false => self.updates.push((new_account.pubkey, new_voteacc)),
true => self.insert_vote(new_account.pubkey, new_voteacc),
}
} }
Ok(()) Ok(())
} }
fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) { fn insert_vote(&mut self, vote_account: Pubkey, vote_data: StoredVote) {
vote_map_insert_vote(&mut self.votes, vote_account, vote_data); vote_map_insert_vote(&mut self.votes, vote_account, vote_data);
} }
fn remove_from_store(&mut self, account_pk: &Pubkey, update_slot: Slot) {
if self
.votes
.get(account_pk)
.map(|vote| vote.last_update_slot <= update_slot)
.unwrap_or(true)
{
log::info!("Vote remove_from_store for {}", account_pk.to_string());
self.votes.remove(account_pk);
}
}
} }
pub fn merge_program_account_in_vote_map( pub fn merge_program_account_in_vote_map(
@ -180,7 +199,7 @@ fn vote_map_insert_vote(map: &mut VoteMap, vote_account_pk: Pubkey, vote_data: S
); );
} }
*voteacc = vote_data; *voteacc = Arc::new(vote_data);
} }
} }
// If value doesn't exist yet, then insert a new value of 1 // If value doesn't exist yet, then insert a new value of 1
@ -190,7 +209,7 @@ fn vote_map_insert_vote(map: &mut VoteMap, vote_account_pk: Pubkey, vote_data: S
vote_data.vote_data.node_pubkey, vote_data.vote_data.node_pubkey,
vote_data.vote_data.root_slot, vote_data.vote_data.root_slot,
); );
vacant.insert(vote_data); vacant.insert(Arc::new(vote_data));
} }
}; };
} }