refactore bootstrap, epoch and leaderschdule to use state change

This commit is contained in:
musitdev 2023-09-13 20:45:29 +02:00
parent 0efb26c01b
commit 0c4f4d1e52
6 changed files with 599 additions and 363 deletions

View File

@ -0,0 +1,207 @@
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
use crate::Slot;
use futures_util::stream::FuturesUnordered;
use futures_util::TryFutureExt;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use tokio::task::JoinHandle;
use tokio::time::Duration;
/*
Bootstrap state changes
InitBootstrap
|
|Fetch accounts|
| |
Error BootstrapAccountsFetched(account list)
| |
|Exit| |Extract stores|
| |
Error StoreExtracted(account list, stores)
| |
| Wait(1s)| |Merge accounts in store|
| | |
BootstrapAccountsFetched(account list) Error AccountsMerged(stores)
| |
|Log and skip| |Merges store|
|Account | | |
Error End
|
|never occurs restart|
|
InitBootstrap
*/
pub fn run_bootstrap_events(
event: BootstrapEvent,
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<BootstrapEvent>>,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
data: &BootstrapData,
) {
let result = process_bootstrap_event(event, stakestore, votestore, data);
match result {
BootsrapProcessResult::TaskHandle(jh) => bootstrap_tasks.push(jh),
BootsrapProcessResult::Event(event) => {
run_bootstrap_events(event, bootstrap_tasks, stakestore, votestore, data)
}
BootsrapProcessResult::End => (),
}
}
pub enum BootstrapEvent {
InitBootstrap,
BootstrapAccountsFetched(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>),
StoreExtracted(
StakeMap,
VoteMap,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
),
AccountsMerged(StakeMap, VoteMap),
Exit,
}
enum BootsrapProcessResult {
TaskHandle(JoinHandle<BootstrapEvent>),
Event(BootstrapEvent),
End,
}
pub struct BootstrapData {
pub current_epoch: u64,
pub next_epoch_start_slot: Slot,
pub sleep_time: u64,
pub rpc_url: String,
}
fn process_bootstrap_event(
event: BootstrapEvent,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
data: &BootstrapData,
) -> BootsrapProcessResult {
match event {
BootstrapEvent::InitBootstrap => {
let jh = tokio::spawn({
let rpc_url = data.rpc_url.clone();
let sleep_time = data.sleep_time;
async move {
log::info!("BootstrapEvent::InitBootstrap RECV");
if sleep_time > 0 {
tokio::time::sleep(Duration::from_secs(sleep_time)).await;
}
match crate::bootstrap::bootstrap_accounts(rpc_url).await {
Ok((stakes, votes)) => {
BootstrapEvent::BootstrapAccountsFetched(stakes, votes)
}
Err(err) => {
log::warn!(
"Bootstrap account error during fetching accounts err:{err}. Exit"
);
BootstrapEvent::Exit
}
}
}
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::BootstrapAccountsFetched(stakes, votes) => {
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => BootsrapProcessResult::Event(
BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes),
),
_ => {
let jh = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
BootstrapEvent::BootstrapAccountsFetched(stakes, votes)
});
BootsrapProcessResult::TaskHandle(jh)
}
}
}
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes) => {
log::trace!("BootstrapEvent::StoreExtracted RECV");
//merge new PA with stake map and vote map in a specific task
let jh = tokio::task::spawn_blocking({
let next_epoch_start_slot = data.next_epoch_start_slot;
let current_epoch = data.current_epoch;
move || {
//update pa_list to set slot update to start epoq one.
crate::stakestore::merge_program_account_in_strake_map(
&mut stake_map,
stakes,
next_epoch_start_slot,
current_epoch,
);
crate::votestore::merge_program_account_in_vote_map(
&mut vote_map,
votes,
next_epoch_start_slot,
);
BootstrapEvent::AccountsMerged(stake_map, vote_map)
}
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::AccountsMerged(stake_map, vote_map) => {
log::trace!("BootstrapEvent::AccountsMerged RECV");
match (
merge_stakestore(stakestore, stake_map, data.current_epoch),
merge_votestore(votestore, vote_map),
) {
(Ok(()), Ok(())) => BootsrapProcessResult::End,
_ => {
//TODO remove this error using type state
log::warn!("BootstrapEvent::AccountsMerged merge stake or vote fail, non extracted stake/vote map err, restart bootstrap");
BootsrapProcessResult::Event(BootstrapEvent::InitBootstrap)
}
}
}
BootstrapEvent::Exit => panic!("Bootstrap account can't be done exit"),
}
}
async fn bootstrap_accounts(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>), ClientError> {
let furure = get_stake_account(rpc_url.clone()).and_then(|stakes| async move {
get_vote_account(rpc_url).await.map(|votes| (stakes, votes))
});
furure.await
}
async fn get_stake_account(rpc_url: String) -> Result<Vec<(Pubkey, Account)>, ClientError> {
log::info!("TaskToExec RpcGetStakeAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client
.get_program_accounts(&solana_sdk::stake::program::id())
.await;
log::info!("TaskToExec RpcGetStakeAccount END");
res_stake
}
async fn get_vote_account(rpc_url: String) -> Result<Vec<(Pubkey, Account)>, ClientError> {
log::info!("TaskToExec RpcGetVoteAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_vote = rpc_client
.get_program_accounts(&solana_sdk::vote::program::id())
.await;
log::info!("TaskToExec RpcGetVoteAccount END");
res_vote
}

View File

@ -0,0 +1,150 @@
use crate::leader_schedule::LeaderScheduleEvent;
use crate::Slot;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use yellowstone_grpc_proto::geyser::CommitmentLevel;
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
#[derive(Debug)]
pub struct CurrentEpochSlotState {
pub current_slot: CurrentSlot,
pub current_epoch: EpochInfo,
pub next_epoch_start_slot: Slot,
pub first_epoch_slot: bool,
}
impl CurrentEpochSlotState {
pub async fn bootstrap(rpc_url: String) -> Result<CurrentEpochSlotState, ClientError> {
let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::finalized());
// Fetch current epoch
let current_epoch = rpc_client.get_epoch_info().await?;
let next_epoch_start_slot =
current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
log::info!("Run_loop init {next_epoch_start_slot} current_epoch:{current_epoch:?}");
Ok(CurrentEpochSlotState {
current_slot: Default::default(),
current_epoch,
next_epoch_start_slot,
first_epoch_slot: false,
})
}
pub fn current_epoch_end_slot(&self) -> Slot {
self.next_epoch_start_slot - 1
}
pub fn process_new_slot(
&mut self,
new_slot: &SubscribeUpdateSlot,
) -> Option<LeaderScheduleEvent> {
if let CommitmentLevel::Confirmed = new_slot.status() {
//for the first update of slot correct epoch info data.
if self.current_slot.confirmed_slot == 0 {
let diff = new_slot.slot - self.current_epoch.absolute_slot;
self.current_epoch.slot_index += diff;
self.current_epoch.absolute_slot = new_slot.slot;
log::trace!(
"Set current epoch with diff:{diff} slot:{} current:{:?}",
new_slot.slot,
self.current_epoch,
);
//if new slot update slot related state data.
} else if new_slot.slot > self.current_slot.confirmed_slot {
//update epoch info
let mut diff = new_slot.slot - self.current_slot.confirmed_slot;
//First epoch slot, index is 0 so remove 1 from diff.
if self.first_epoch_slot {
change_epoch(&mut self.current_epoch, new_slot.slot);
log::info!(
"change_epoch calculated next epoch:{:?} at slot:{}",
self.current_epoch,
new_slot.slot,
);
self.first_epoch_slot = false;
//slot_index start at 0.
diff -= 1; //diff is always >= 1
self.next_epoch_start_slot =
self.next_epoch_start_slot + self.current_epoch.slots_in_epoch;
//set to next epochs.
} else {
self.current_epoch.absolute_slot = new_slot.slot;
}
self.current_epoch.slot_index += diff;
log::trace!(
"Slot epoch with slot:{} , diff:{diff} current epoch state:{self:?}",
new_slot.slot
);
}
}
//update slot state for all commitment.
self.current_slot.update_slot(&new_slot);
if let CommitmentLevel::Confirmed = new_slot.status() {
self.manage_change_epoch()
} else {
None
}
}
fn manage_change_epoch(&mut self) -> Option<LeaderScheduleEvent> {
//we change the epoch at the last slot of the current epoch.
if self.current_slot.confirmed_slot >= self.current_epoch_end_slot() {
log::info!(
"End epoch slot detected:{}",
self.current_slot.confirmed_slot
);
//set epoch change effectif at next slot.
self.first_epoch_slot = true;
//start leader schedule calculus
//switch to next epoch to calculate schedule at next epoch.
let mut schedule_epoch = self.current_epoch.clone();
change_epoch(&mut schedule_epoch, self.current_slot.confirmed_slot);
Some(LeaderScheduleEvent::InitLeaderschedule(schedule_epoch))
} else {
None
}
}
}
fn change_epoch(current_epoch: &mut EpochInfo, current_slot: Slot) {
current_epoch.epoch += 1;
//slot can be non consecutif, use diff.
current_epoch.slot_index = current_epoch
.slot_index
.saturating_sub(current_epoch.slots_in_epoch);
current_epoch.absolute_slot = current_slot;
}
#[derive(Default, Debug, Clone)]
pub struct CurrentSlot {
pub processed_slot: u64,
pub confirmed_slot: u64,
pub finalized_slot: u64,
}
impl CurrentSlot {
fn update_slot(&mut self, slot: &SubscribeUpdateSlot) {
let updade = |commitment: &str, current_slot: &mut u64, new_slot: u64| {
//verify that the slot is consecutif
if *current_slot != 0 && new_slot != *current_slot + 1 {
log::trace!(
"At {commitment} not consecutif slot send: current_slot:{} new_slot{}",
current_slot,
new_slot
);
}
*current_slot = new_slot
};
match slot.status() {
CommitmentLevel::Processed => updade("Processed", &mut self.processed_slot, slot.slot),
CommitmentLevel::Confirmed => updade("Confirmed", &mut self.confirmed_slot, slot.slot),
CommitmentLevel::Finalized => updade("Finalized", &mut self.finalized_slot, slot.slot),
}
}
}

View File

@ -1,4 +1,7 @@
use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeStore};
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
use anyhow::bail;
use futures::stream::FuturesUnordered;
use solana_client::rpc_client::RpcClient;
use solana_ledger::leader_schedule::LeaderSchedule;
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
@ -9,10 +12,134 @@ use solana_sdk::stake::state::Delegation;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::time::Duration;
use tokio::task::JoinHandle;
const MAX_EPOCH_VALUE: u64 = 18446744073709551615;
pub fn calculate_leader_schedule_from_stake_map(
/*
Leader schedule calculus state diagram
InitLeaderschedule
|
|extract store stake and vote|
| |
Error CalculateScedule(stakes, votes)
| |
| Wait(1s)| |Calculate schedule|
| |
InitLeaderscedule MergeStore(stakes, votes, schedule)
| |
Error SaveSchedule(schedule)
| |
|never occurs restart (wait 1s)| |save schedule and verify (opt)|
|
InitLeaderscedule
*/
pub fn run_leader_schedule_events(
event: LeaderScheduleEvent,
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> Option<(Option<LeaderSchedule>, EpochInfo)> {
let result = process_leadershedule_event(event, stakestore, votestore);
match result {
LeaderScheduleResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh);
None
}
LeaderScheduleResult::Event(event) => {
run_leader_schedule_events(event, bootstrap_tasks, stakestore, votestore)
}
LeaderScheduleResult::End(schedule, epoch) => Some((schedule, epoch)),
}
}
pub struct LeaderScheduleData {
pub leader_schedule: Option<LeaderSchedule>,
pub schedule_epoch: EpochInfo,
}
pub enum LeaderScheduleEvent {
InitLeaderschedule(EpochInfo),
CalculateScedule(StakeMap, VoteMap, EpochInfo),
MergeStoreAndSaveSchedule(StakeMap, VoteMap, Option<LeaderSchedule>, EpochInfo),
}
enum LeaderScheduleResult {
TaskHandle(JoinHandle<LeaderScheduleEvent>),
Event(LeaderScheduleEvent),
End(Option<LeaderSchedule>, EpochInfo),
}
fn process_leadershedule_event(
event: LeaderScheduleEvent,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> LeaderScheduleResult {
match event {
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch) => {
log::info!("LeaderScheduleEvent::InitLeaderschedule RECV");
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => LeaderScheduleResult::Event(
LeaderScheduleEvent::CalculateScedule(stake_map, vote_map, schedule_epoch),
),
_ => {
log::warn!("process_leadershedule_event error during extract store");
let jh = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
LeaderScheduleEvent::InitLeaderschedule(schedule_epoch)
});
LeaderScheduleResult::TaskHandle(jh)
}
}
}
LeaderScheduleEvent::CalculateScedule(stake_map, vote_map, schedule_epoch) => {
log::info!("LeaderScheduleEvent::CalculateScedule RECV");
let jh = tokio::task::spawn_blocking({
move || {
let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(
&stake_map,
&vote_map,
&schedule_epoch,
);
log::info!("End calculate leader schedule");
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
schedule.ok(),
schedule_epoch,
)
}
});
LeaderScheduleResult::TaskHandle(jh)
}
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
schedule,
schedule_epoch,
) => {
log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV");
match (
merge_stakestore(stakestore, stake_map, schedule_epoch.epoch),
merge_votestore(votestore, vote_map),
) {
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule, schedule_epoch),
_ => {
//this shoud never arrive because the store has been extracted before.
//TODO remove this error using type state
log::warn!("LeaderScheduleEvent::MergeStoreAndSaveSchedule merge stake or vote fail, non extracted stake/vote map err, restart Schedule");
LeaderScheduleResult::Event(LeaderScheduleEvent::InitLeaderschedule(
schedule_epoch,
))
}
}
}
}
}
fn calculate_leader_schedule_from_stake_map(
stake_map: &crate::stakestore::StakeMap,
vote_map: &crate::votestore::VoteMap,
current_epoch_info: &EpochInfo,

View File

@ -1,4 +1,5 @@
//RUST_BACKTRACE=1 RUST_LOG=stake_aggregate=trace cargo run --release --bin stake_aggregate
//RUST_BACKTRACE=1 RUST_LOG=stake_aggregate=info cargo run --release --bin stake_aggregate &> stake_logs.txt &
/*
RPC calls;
curl http://localhost:3000 -X POST -H "Content-Type: application/json" -d '
@ -17,26 +18,20 @@
"method": "bootstrap_accounts",
"params": []
}
'
' -o extract_stake2.json
*/
//TODO: add stake verify that it' not already desactivated.
use crate::stakestore::extract_stakestore;
use crate::stakestore::merge_stakestore;
use crate::bootstrap::BootstrapData;
use crate::bootstrap::BootstrapEvent;
use crate::leader_schedule::LeaderScheduleData;
use crate::stakestore::StakeStore;
use crate::votestore::extract_votestore;
use crate::votestore::merge_votestore;
use crate::votestore::VoteStore;
use anyhow::bail;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use solana_client::client_error::ClientError;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_ledger::leader_schedule::LeaderSchedule;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation;
use solana_sdk::vote::state::VoteState;
@ -49,10 +44,12 @@ use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::{
prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots, SubscribeUpdateSlot},
prelude::{subscribe_update::UpdateOneof, SubscribeRequestFilterSlots},
tonic::service::Interceptor,
};
mod bootstrap;
mod epoch;
mod leader_schedule;
mod rpc;
mod stakestore;
@ -79,10 +76,10 @@ pub fn log_end_epoch(
msg: String,
) {
//log 50 end slot.
if current_slot != 0 && current_slot + 20 > end_epoch_slot {
if current_slot != 0 && current_slot + 10 > end_epoch_slot {
log::info!("{current_slot}/{end_epoch_slot} {}", msg);
}
if epoch_slot_index < 20 {
if epoch_slot_index < 10 {
log::info!("{current_slot}/{end_epoch_slot} {}", msg);
}
}
@ -114,31 +111,25 @@ async fn main() -> anyhow::Result<()> {
async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Result<()> {
//local vars
let mut current_slot: CurrentSlot = Default::default();
//slot and epoch
let mut current_epoch_state =
epoch::CurrentEpochSlotState::bootstrap(RPC_URL.to_string()).await?;
//Stake account management struct
let mut stakestore = StakeStore::new(STAKESTORE_INITIAL_CAPACITY);
let mut current_epoch = {
let rpc_client =
RpcClient::new_with_commitment(RPC_URL.to_string(), CommitmentConfig::finalized());
// Fetch current epoch
rpc_client.get_epoch_info().await?
//Vote account management struct
let mut votestore = VoteStore::new(VOTESTORE_INITIAL_CAPACITY);
//leader schedule
let mut current_leader_schedule = LeaderScheduleData {
leader_schedule: None,
schedule_epoch: current_epoch_state.current_epoch.clone(),
};
let mut next_epoch_start_slot =
current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
log::info!("Run_loop init {next_epoch_start_slot} current_epoch:{current_epoch:?}");
let mut spawned_task_toexec = FuturesUnordered::new();
let mut spawned_task_result = FuturesUnordered::new();
//use to set an initial state of all PA
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(
current_epoch.absolute_slot - current_epoch.slot_index,
0,
)));
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetVoteAccount(
current_epoch.absolute_slot - current_epoch.slot_index,
0,
)));
//future execution collection.
let mut spawned_bootstrap_task = FuturesUnordered::new();
let mut spawned_leader_schedule_task = FuturesUnordered::new();
//subscribe Geyser grpc
//slot subscription
@ -154,6 +145,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
owner: vec![
solana_sdk::stake::program::ID.to_string(),
solana_sdk::vote::program::ID.to_string(),
// solana_sdk::system_program::ID.to_string(),
],
filters: vec![],
},
@ -190,17 +182,24 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
)
.await?;
//log current data at interval
let mut log_interval = tokio::time::interval(Duration::from_millis(600000));
//log current data at interval TODO to be removed. only for test.
let mut log_interval = tokio::time::interval(Duration::from_millis(10000));
//start local rpc access to execute command.
//start local rpc access to get RPC request.
let (request_tx, mut request_rx) = tokio::sync::mpsc::channel(100);
let rpc_handle = crate::rpc::run_server(request_tx).await?;
//make it run forever
tokio::spawn(rpc_handle.stopped());
//Vote account management struct
let mut votestore = VoteStore::new(VOTESTORE_INITIAL_CAPACITY);
//Init bootstrap process
let bootstrap_data = BootstrapData {
current_epoch: current_epoch_state.current_epoch.epoch,
next_epoch_start_slot: current_epoch_state.next_epoch_start_slot,
sleep_time: 1,
rpc_url: RPC_URL.to_string(),
};
let jh = tokio::spawn(async move { BootstrapEvent::InitBootstrap });
spawned_bootstrap_task.push(jh);
loop {
tokio::select! {
@ -210,9 +209,14 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
tokio::task::spawn_blocking({
log::info!("RPC start save_stakes");
let current_stakes = stakestore.get_cloned_stake_map();
let move_epoch = current_epoch.clone();
let move_epoch = current_epoch_state.current_epoch.clone();
move || {
let current_stake = crate::leader_schedule::build_current_stakes(&current_stakes, &move_epoch, RPC_URL.to_string(), CommitmentConfig::confirmed());
let current_stake = crate::leader_schedule::build_current_stakes(
&current_stakes,
&move_epoch,
RPC_URL.to_string(),
CommitmentConfig::confirmed(),
);
log::info!("RPC save_stakes generation done");
if let Err(err) = crate::leader_schedule::save_schedule_on_file("stakes", &current_stake) {
log::error!("Error during current stakes saving:{err}");
@ -225,7 +229,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
crate::rpc::Requests::BootstrapAccounts(tx) => {
log::info!("RPC start save_stakes");
let current_stakes = stakestore.get_cloned_stake_map();
if let Err(err) = tx.send((current_stakes, current_slot.confirmed_slot)){
if let Err(err) = tx.send((current_stakes, current_epoch_state.current_slot.confirmed_slot)){
println!("Channel error during sending bacl request status error:{err:?}");
}
log::info!("RPC bootstrap account send");
@ -233,201 +237,42 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}
},
//log interval
//log interval TODO remove
_ = log_interval.tick() => {
log::info!("Run_loop update new epoch:{current_epoch:?} current slot:{current_slot:?} next epoch start slot:{next_epoch_start_slot}");
log::info!("Change epoch equality {} >= {}", current_slot.confirmed_slot, next_epoch_start_slot-1);
log::info!("Run_loop update new epoch:{current_epoch_state:?}");
log::info!("Change epoch equality {} >= {}", current_epoch_state.current_slot.confirmed_slot, current_epoch_state.current_epoch_end_slot());
log::info!("number of stake accounts:{}", stakestore.nb_stake_account());
}
//Execute RPC call in another task
Some(to_exec) = spawned_task_toexec.next() => {
let jh = tokio::spawn(async move {
match to_exec {
TaskToExec::RpcGetStakeAccount(epoch_start_slot, sleep_time) => {
if sleep_time > 0 {
tokio::time::sleep(Duration::from_secs(sleep_time)).await;
}
log::info!("TaskToExec RpcGetStakeAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(RPC_URL.to_string(), Duration::from_secs(600), CommitmentConfig::finalized());
let res_stake = rpc_client.get_program_accounts(&solana_sdk::stake::program::id()).await;
log::info!("TaskToExec RpcGetStakeAccount END");
TaskResult::RpcGetStakeAccount(res_stake, epoch_start_slot)
},
TaskToExec::RpcGetVoteAccount(epoch_start_slot, sleep_time) => {
if sleep_time > 0 {
tokio::time::sleep(Duration::from_secs(sleep_time)).await;
}
log::info!("TaskToExec RpcGetVoteAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(RPC_URL.to_string(), Duration::from_secs(600), CommitmentConfig::finalized());
let res_vote = rpc_client.get_program_accounts(&solana_sdk::vote::program::id()).await;
log::info!("TaskToExec RpcGetVoteAccount END");
TaskResult::RpcGetVoteAccount(res_vote, epoch_start_slot)
},
TaskToExec::RpcGetCurrentEpoch => {
//TODO remove no need epoch is calculated.
log::info!("TaskToExec RpcGetCurrentEpoch start");
//wait 1 sec to be sure RPC change epoch
tokio::time::sleep(Duration::from_secs(1)).await;
let rpc_client = RpcClient::new_with_timeout_and_commitment(RPC_URL.to_string(), Duration::from_secs(600), CommitmentConfig::finalized());
let res = rpc_client.get_epoch_info().await;
TaskResult::CurrentEpoch(res)
}
}
});
spawned_task_result.push(jh);
//exec bootstrap task
Some(Ok(event)) = spawned_bootstrap_task.next() => {
crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, &bootstrap_data);
}
//Manage RPC call result execution
Some(some_res) = spawned_task_result.next() => {
match some_res {
Ok(TaskResult::RpcGetStakeAccount(Ok(stake_list), epoch_start_slot)) => {
let Ok(mut stake_map) = extract_stakestore(&mut stakestore) else {
//retry later, epoch schedule is currently processed
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(epoch_start_slot, 0)));
continue;
};
//merge new PA with stake map in a specific thread
log::trace!("Run_loop before Program account stake merge START");
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
if let Some((new_schedule, epoch)) = crate::leader_schedule::run_leader_schedule_events(
event,
&mut spawned_leader_schedule_task,
&mut stakestore,
&mut votestore,
) {
//current_leader_schedule.leader_schedule = new_schedule;
current_leader_schedule.schedule_epoch = epoch;
let jh = tokio::task::spawn_blocking({
let move_epoch = current_epoch.clone();
move || {
//update pa_list to set slot update to start epoq one.
crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, stake_list, epoch_start_slot, &move_epoch);
TaskResult::MergeStakeList(stake_map)
//TODO remove verification when schedule ok.
//verify calculated shedule with the one the RPC return.
if let Some(schedule) = new_schedule {
tokio::task::spawn_blocking(|| {
//10 second that the schedule has been calculated on the validator
std::thread::sleep(std::time::Duration::from_secs(5));
log::info!("Start Verify schedule");
if let Err(err) = crate::leader_schedule::verify_schedule(schedule,RPC_URL.to_string()) {
log::warn!("Error during schedule verification:{err}");
}
log::info!("End Verify schedule");
});
spawned_task_result.push(jh);
}
//getPA can fail should be retarted.
Ok(TaskResult::RpcGetStakeAccount(Err(err), epoch_start_slot)) => {
log::warn!("RPC call get Stake Account return invalid result: {err:?}");
//get pa can fail should be retarted.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(epoch_start_slot, 0)));
}
Ok(TaskResult::MergeStakeList(stake_map)) => {
if let Err(err) = merge_stakestore(&mut stakestore, stake_map, &current_epoch) {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(0, 0)));
continue;
};
log::info!("Run_loop Program account stake merge END");
//TODO REMOVE
//To test verify the schedule
// let Ok(stake_map) = extract_stakestore(&mut stakestore) else {
// log::info!("Epoch schedule aborted because a getPA is currently running.");
// continue;
// };
// let jh = tokio::task::spawn_blocking({
// let move_epoch = current_epoch.clone();
// move || {
// let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &move_epoch);
// TaskResult::ScheduleResult(schedule.ok(), stake_map)
// }
// });
// spawned_task_result.push(jh);
//end test
}
Ok(TaskResult::RpcGetVoteAccount(Ok(vote_list), epoch_start_slot)) => {
let Ok(mut vote_map) = extract_votestore(&mut votestore) else {
//retry later, epoch schedule is currently processed
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetVoteAccount(epoch_start_slot, 0)));
continue;
};
//merge new PA with stake map in a specific thread
log::trace!("Run_loop before Program account VOTE merge START");
let jh = tokio::task::spawn_blocking({
move || {
//update pa_list to set slot update to start epoq one.
crate::votestore::merge_program_account_in_vote_map(&mut vote_map, vote_list, epoch_start_slot);
TaskResult::MergeVoteList(vote_map)
}
});
spawned_task_result.push(jh);
}
//getPA can fail should be retarted.
Ok(TaskResult::RpcGetVoteAccount(Err(err), epoch_start_slot)) => {
log::warn!("RPC call getVote account return invalid result: {err:?}");
//get pa can fail should be retarted.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetVoteAccount(epoch_start_slot, 0)));
}
Ok(TaskResult::MergeVoteList(vote_map)) => {
if let Err(err) = merge_votestore(&mut votestore, vote_map) {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log::warn!("merge vote on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetVoteAccount(0, 0)));
continue;
};
log::info!("Run_loop Program Vote account merge END");
}
Ok(TaskResult::CurrentEpoch(Ok(epoch_info))) => {
//TODO remove no need epoch is calculated.
//only update new epoch slot if the RPC call return the next epoch. Some time it still return the current epoch.
if current_epoch.epoch <= epoch_info.epoch {
current_epoch = epoch_info;
//calcualte slotindex with current slot. getEpichInfo doesn't return data on current slot.
if current_slot.confirmed_slot > current_epoch.absolute_slot {
let diff = current_slot.confirmed_slot - current_epoch.absolute_slot;
current_epoch.absolute_slot += diff;
current_epoch.slot_index += diff;
log::trace!("Update current epoch, diff:{diff}");
}
next_epoch_start_slot = current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
log::info!("Run_loop update new epoch:{current_epoch:?} current slot:{current_slot:?} next_epoch_start_slot:{next_epoch_start_slot}");
} else {
//RPC epoch hasn't changed retry
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetCurrentEpoch));
}
}
Ok(TaskResult::ScheduleResult(schedule_opt, stake_map, vote_map)) => {
//merge stake
let merge_error = match merge_stakestore(&mut stakestore, stake_map, &current_epoch) {
Ok(()) => false,
Err(err) => {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(0, 0)));
true
}
};
//merge vote
if let Err(err) = merge_votestore(&mut votestore, vote_map) {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetVoteAccount(0, 0)));
continue;
};
if merge_error {
continue;
}
//verify calculated shedule with the one the RPC return.
if let Some(schedule) = schedule_opt {
tokio::task::spawn_blocking(|| {
//10 second that the schedule has been calculated on the validator
std::thread::sleep(std::time::Duration::from_secs(20));
log::info!("Start Verify schedule");
if let Err(err) = crate::leader_schedule::verify_schedule(schedule,RPC_URL.to_string()) {
log::warn!("Error during schedule verification:{err}");
}
log::info!("End Verify schedule");
});
}
}
_ => log::warn!("RPC call return invalid result: {some_res:?}"),
}
}
@ -443,100 +288,64 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
match msg.update_oneof {
Some(UpdateOneof::Account(account)) => {
//store new account stake.
if let Some(account) = read_account(account, current_slot.confirmed_slot) {
if let Some(account) = read_account(account, current_epoch_state.current_slot.confirmed_slot) {
//log::trace!("Geyser receive new account");
match account.owner {
solana_sdk::stake::program::ID => {
if let Err(err) = stakestore.add_stake(account, next_epoch_start_slot-1, &current_epoch) {
if let Err(err) = stakestore.add_stake(
account,
current_epoch_state.current_epoch_end_slot(),
current_epoch_state.current_epoch.epoch,
) {
log::warn!("Can't add new stake from account data err:{}", err);
continue;
}
}
solana_sdk::vote::program::ID => {
//process vote accout notification
if let Err(err) = votestore.add_vote(account, next_epoch_start_slot-1) {
if let Err(err) = votestore.add_vote(account, current_epoch_state.current_epoch_end_slot()) {
log::warn!("Can't add new stake from account data err:{}", err);
continue;
}
}
solana_sdk::system_program::ID => {
log::info!("system_program account:{}",account.pubkey);
}
_ => log::warn!("receive an account notification from a unknown owner:{account:?}"),
}
}
}
Some(UpdateOneof::Slot(slot)) => {
//for the first update of slot correct epoch info data.
if let CommitmentLevel::Confirmed = slot.status(){
if current_slot.confirmed_slot == 0 {
let diff = slot.slot - current_epoch.absolute_slot;
current_epoch.absolute_slot += diff;
current_epoch.slot_index += diff;
log::trace!("Set current epoch with diff:{diff} slot:{} current:{}", slot.slot, current_epoch.absolute_slot);
}
}
//update current slot
log::trace!("Receive slot slot: {slot:?}");
//TODO remove log
//log::info!("Processing slot: {:?} current slot:{:?}", slot, current_slot);
log_end_epoch(current_slot.confirmed_slot, next_epoch_start_slot, current_epoch.slot_index, format!("Receive slot: {:?} at commitment:{:?}", slot.slot, slot.status()));
// log_end_epoch(
// current_epoch_state.current_slot.confirmed_slot,
// current_epoch_state.next_epoch_start_slot,
// current_epoch_state.current_epoch.slot_index,
// format!(
// "Receive slot: {:?} at commitment:{:?}",
// slot.slot,
// slot.status()
// ),
// );
//update epoch info
if let CommitmentLevel::Confirmed = slot.status(){
if current_slot.confirmed_slot != 0 && slot.slot > current_slot.confirmed_slot {
let diff = slot.slot - current_slot.confirmed_slot;
current_epoch.slot_index += diff;
current_epoch.absolute_slot += diff;
log::trace!("Update epoch with slot, diff:{diff}");
}
let schedule_event = current_epoch_state.process_new_slot(&slot);
if let Some(init_event) = schedule_event {
crate::leader_schedule::run_leader_schedule_events(
init_event,
&mut spawned_leader_schedule_task,
&mut stakestore,
&mut votestore,
);
}
current_slot.update_slot(&slot);
if current_slot.confirmed_slot >= next_epoch_start_slot-1 { //slot can be non consecutif.
log::info!("End epoch slot, change epoch. Calculate schedule at current slot:{}", current_slot.confirmed_slot);
let Ok(stake_map) = extract_stakestore(&mut stakestore) else {
log::info!("Epoch schedule aborted because a extract_stakestore faild.");
continue;
};
let Ok(vote_map) = extract_votestore(&mut votestore) else {
log::info!("Epoch schedule aborted because extract_votestore faild.");
//cancel stake extraction
merge_stakestore(&mut stakestore, stake_map, &current_epoch).unwrap(); //just extracted.
continue;
};
//reload PA account for new epoch start. TODO replace with bootstrap.
//spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(next_epoch_start_slot, 30))); //Wait 30s to get new PA.
//change epoch. Change manually then update using RPC.
current_epoch.epoch +=1;
current_epoch.slot_index = 1;
next_epoch_start_slot = next_epoch_start_slot + current_epoch.slots_in_epoch; //set to next epochs.
log::info!("End slot epoch update calculated next epoch:{current_epoch:?}");
//calculate schedule in a dedicated thread.
let jh = tokio::task::spawn_blocking({
let move_epoch = current_epoch.clone();
move || {
let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &vote_map, &move_epoch);
log::info!("End calculate leader schedule at slot:{}", current_slot.confirmed_slot);
TaskResult::ScheduleResult(schedule.ok(), stake_map, vote_map)
}
});
spawned_task_result.push(jh);
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetCurrentEpoch));
//reload current Stake account a epoch change to synchronize.
}
}
Some(UpdateOneof::BlockMeta(block_meta)) => {
log::info!("Receive Block Meta at slot: {}", block_meta.slot);
}
Some(UpdateOneof::Block(block)) => {
println!("Receive Block at slot: {}", block.slot);
log::info!("Receive Block at slot: {}", block.slot);
}
Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"),
bad_msg => {
@ -579,37 +388,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//Ok(())
}
#[derive(Default, Debug, Clone)]
struct CurrentSlot {
processed_slot: u64,
confirmed_slot: u64,
finalized_slot: u64,
}
impl CurrentSlot {
fn update_slot(&mut self, slot: &SubscribeUpdateSlot) {
let updade = |commitment: &str, current_slot: &mut u64, new_slot: u64| {
//verify that the slot is consecutif
if *current_slot != 0 && new_slot != *current_slot + 1 {
log::trace!(
"At {commitment} not consecutif slot send: current_slot:{} new_slot{}",
current_slot,
new_slot
);
}
*current_slot = new_slot
};
match slot.status() {
CommitmentLevel::Processed => updade("Processed", &mut self.processed_slot, slot.slot),
CommitmentLevel::Confirmed => updade("Confirmed", &mut self.confirmed_slot, slot.slot),
CommitmentLevel::Finalized => updade("Finalized", &mut self.finalized_slot, slot.slot),
}
}
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct AccountPretty {
@ -671,24 +449,3 @@ fn read_account(
txn_signature: bs58::encode(inner_account.txn_signature.unwrap_or_default()).into_string(),
})
}
#[derive(Debug)]
enum TaskToExec {
RpcGetStakeAccount(u64, u64), //epoch_start_slot, sleept time
RpcGetVoteAccount(u64, u64), //epoch_start_slot, sleept time
RpcGetCurrentEpoch,
}
#[derive(Debug)]
enum TaskResult {
RpcGetStakeAccount(Result<Vec<(Pubkey, Account)>, ClientError>, u64), //stake_list, vote_list
RpcGetVoteAccount(Result<Vec<(Pubkey, Account)>, ClientError>, u64), //stake_list, vote_list
CurrentEpoch(Result<EpochInfo, ClientError>),
MergeStakeList(crate::stakestore::StakeMap),
MergeVoteList(crate::votestore::VoteMap),
ScheduleResult(
Option<LeaderSchedule>,
crate::stakestore::StakeMap,
crate::votestore::VoteMap,
),
}

View File

@ -30,6 +30,7 @@ pub enum Requests {
pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHandle, RpcError> {
let server = Server::builder()
.max_response_body_size(1048576000)
.build(RPC_ADDRESS.parse::<SocketAddr>()?)
.await?;
let mut module = RpcModule::new(request_tx);

View File

@ -4,7 +4,6 @@ use anyhow::bail;
use borsh::BorshDeserialize;
use serde::{Deserialize, Serialize};
use solana_sdk::account::Account;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState;
@ -22,10 +21,10 @@ pub fn extract_stakestore(stakestore: &mut StakeStore) -> anyhow::Result<StakeMa
pub fn merge_stakestore(
stakestore: &mut StakeStore,
stake_map: StakeMap,
current_epoch: &EpochInfo,
current_epoch_slot: Slot,
) -> anyhow::Result<()> {
let new_store = std::mem::take(stakestore);
let new_store = new_store.merge_stake(stake_map, current_epoch)?;
let new_store = new_store.merge_stake(stake_map, current_epoch_slot)?;
*stakestore = new_store;
Ok(())
}
@ -34,12 +33,12 @@ fn stake_map_insert_stake(
map: &mut StakeMap,
stake_account: Pubkey,
stake: StoredStake,
current_epoch: &EpochInfo,
current_epoch: u64,
) {
//don't add stake that are already desactivated.
//there's some stran,ge stake that has activeate_epock = max epoch and desactivate ecpock 90 that are taken into account.
//Must be better defined.
// if stake.stake.deactivation_epoch < current_epoch.epoch {
// if stake.stake.deactivation_epoch < current_epoch {
// return;
// }
match map.entry(stake_account) {
@ -50,7 +49,7 @@ fn stake_map_insert_stake(
if strstake.stake.stake != stake.stake.stake {
log::info!(
"Stake updated for: {stake_account} old_stake:{} stake:{stake:?}",
stake.stake.stake
strstake.stake.stake
);
}
*strstake = stake;
@ -111,7 +110,7 @@ impl StakeStore {
Ok((stakestore, self.stakes))
}
pub fn merge_stake(self, stakes: StakeMap, current_epoch: &EpochInfo) -> anyhow::Result<Self> {
pub fn merge_stake(self, stakes: StakeMap, current_epoch: u64) -> anyhow::Result<Self> {
if !self.extracted {
bail!("StakeStore merge of non extracted map. Try later");
}
@ -123,7 +122,7 @@ impl StakeStore {
//apply stake added during extraction.
for (stake_account, stake) in self.updates {
stakestore.insert_stake(stake_account, stake, &current_epoch);
stakestore.insert_stake(stake_account, stake, current_epoch);
}
Ok(stakestore)
}
@ -132,7 +131,7 @@ impl StakeStore {
&mut self,
new_account: AccountPretty,
current_end_epoch_slot: Slot,
current_epoch: &EpochInfo,
current_epoch: u64,
) -> anyhow::Result<()> {
let Ok(delegated_stake_opt) = new_account.read_stake() else {
bail!("Can't read stake from account data");
@ -158,12 +157,7 @@ impl StakeStore {
Ok(())
}
fn insert_stake(
&mut self,
stake_account: Pubkey,
stake: StoredStake,
current_epoch: &EpochInfo,
) {
fn insert_stake(&mut self, stake_account: Pubkey, stake: StoredStake, current_epoch: u64) {
stake_map_insert_stake(&mut self.stakes, stake_account, stake, current_epoch);
}
}
@ -172,7 +166,7 @@ pub fn merge_program_account_in_strake_map(
stake_map: &mut StakeMap,
pa_list: Vec<(Pubkey, Account)>,
last_update_slot: Slot,
current_epoch: &EpochInfo,
current_epoch: u64,
) {
pa_list
.into_iter()