2023-09-05 07:18:34 -07:00
//RUST_BACKTRACE=1 RUST_LOG=stake_aggregate=trace cargo run --release
/*
RPC calls ;
curl http ://localhost:3000 -X POST -H "Content-Type: application/json" -d '
{
" jsonrpc " : " 2.0 " ,
" id " : 1 ,
" method " : " save_stakes " ,
" params " : [ ]
}
'
2023-09-11 09:27:21 -07:00
curl http ://localhost:3000 -X POST -H "Content-Type: application/json" -d '
{
" jsonrpc " : " 2.0 " ,
" id " : 1 ,
" method " : " bootstrap_accounts " ,
" params " : [ ]
}
'
2023-09-05 07:18:34 -07:00
* /
2023-09-06 04:50:36 -07:00
//TODO: add stake verify that it' not already desactivated.
2023-08-29 03:16:57 -07:00
use crate ::stakestore ::extract_stakestore ;
use crate ::stakestore ::merge_stakestore ;
2023-08-11 00:13:31 -07:00
use crate ::stakestore ::StakeStore ;
2023-09-08 11:07:43 -07:00
use crate ::votestore ::extract_votestore ;
use crate ::votestore ::merge_votestore ;
use crate ::votestore ::VoteStore ;
2023-08-29 03:16:57 -07:00
use anyhow ::bail ;
2023-08-11 00:13:31 -07:00
use futures_util ::stream ::FuturesUnordered ;
use futures_util ::StreamExt ;
use solana_client ::client_error ::ClientError ;
use solana_client ::nonblocking ::rpc_client ::RpcClient ;
use solana_ledger ::leader_schedule ::LeaderSchedule ;
use solana_sdk ::account ::Account ;
use solana_sdk ::commitment_config ::CommitmentConfig ;
use solana_sdk ::epoch_info ::EpochInfo ;
use solana_sdk ::pubkey ::Pubkey ;
use solana_sdk ::stake ::state ::Delegation ;
2023-08-29 03:16:57 -07:00
use solana_sdk ::vote ::state ::VoteState ;
2023-08-11 00:13:31 -07:00
use std ::collections ::HashMap ;
2023-08-30 07:06:11 -07:00
use tokio ::time ::Duration ;
2023-08-11 00:13:31 -07:00
use yellowstone_grpc_client ::GeyserGrpcClient ;
use yellowstone_grpc_proto ::geyser ::CommitmentLevel ;
use yellowstone_grpc_proto ::geyser ::SubscribeUpdateAccount ;
use yellowstone_grpc_proto ::prelude ::SubscribeRequestFilterAccounts ;
2023-09-02 08:52:28 -07:00
use yellowstone_grpc_proto ::prelude ::SubscribeRequestFilterBlocks ;
use yellowstone_grpc_proto ::prelude ::SubscribeRequestFilterBlocksMeta ;
2023-08-11 00:13:31 -07:00
use yellowstone_grpc_proto ::{
prelude ::{ subscribe_update ::UpdateOneof , SubscribeRequestFilterSlots , SubscribeUpdateSlot } ,
tonic ::service ::Interceptor ,
} ;
mod leader_schedule ;
2023-09-05 03:53:28 -07:00
mod rpc ;
2023-08-11 00:13:31 -07:00
mod stakestore ;
2023-09-08 11:07:43 -07:00
mod votestore ;
2023-08-11 00:13:31 -07:00
type Slot = u64 ;
2023-08-12 01:58:41 -07:00
//WebSocket URL: ws://localhost:8900/ (computed)
2023-09-01 01:07:42 -07:00
const GRPC_URL : & str = " http://localhost:10000 " ;
const RPC_URL : & str = " http://localhost:8899 " ;
2023-08-30 07:06:11 -07:00
2023-08-12 01:58:41 -07:00
//const RPC_URL: &str = "https://api.mainnet-beta.solana.com";
2023-08-11 00:13:31 -07:00
//const RPC_URL: &str = "https://api.testnet.solana.com";
//const RPC_URL: &str = "https://api.devnet.solana.com";
const STAKESTORE_INITIAL_CAPACITY : usize = 600000 ;
2023-09-08 11:07:43 -07:00
const VOTESTORE_INITIAL_CAPACITY : usize = 600000 ;
2023-08-11 00:13:31 -07:00
2023-09-05 07:18:34 -07:00
pub fn log_end_epoch (
current_slot : Slot ,
end_epoch_slot : Slot ,
epoch_slot_index : Slot ,
msg : String ,
) {
2023-09-02 08:52:28 -07:00
//log 50 end slot.
2023-09-05 07:18:34 -07:00
if current_slot ! = 0 & & current_slot + 20 > end_epoch_slot {
log ::info! ( " {current_slot}/{end_epoch_slot} {} " , msg ) ;
}
if epoch_slot_index < 20 {
2023-09-02 08:52:28 -07:00
log ::info! ( " {current_slot}/{end_epoch_slot} {} " , msg ) ;
}
}
2023-08-11 00:13:31 -07:00
/// Connect to yellow stone plugin using yellow stone gRpc Client
#[ tokio::main ]
async fn main ( ) -> anyhow ::Result < ( ) > {
tracing_subscriber ::fmt ::init ( ) ;
let mut client = GeyserGrpcClient ::connect ( GRPC_URL , None ::< & 'static str > , None ) ? ;
let version = client . get_version ( ) . await ? ;
println! ( " Version: {:?} " , version ) ;
let ctrl_c_signal = tokio ::signal ::ctrl_c ( ) ;
tokio ::select! {
res = run_loop ( client ) = > {
// This should never happen
log ::error! ( " Services quit unexpectedly {res:?} " ) ;
}
_ = ctrl_c_signal = > {
log ::info! ( " Received ctrl+c signal " ) ;
}
}
Ok ( ( ) )
}
async fn run_loop < F : Interceptor > ( mut client : GeyserGrpcClient < F > ) -> anyhow ::Result < ( ) > {
//local vars
let mut current_slot : CurrentSlot = Default ::default ( ) ;
let mut stakestore = StakeStore ::new ( STAKESTORE_INITIAL_CAPACITY ) ;
let mut current_epoch = {
let rpc_client =
RpcClient ::new_with_commitment ( RPC_URL . to_string ( ) , CommitmentConfig ::finalized ( ) ) ;
// Fetch current epoch
rpc_client . get_epoch_info ( ) . await ?
} ;
2023-08-31 11:23:57 -07:00
let mut next_epoch_start_slot =
2023-08-31 06:27:47 -07:00
current_epoch . slots_in_epoch - current_epoch . slot_index + current_epoch . absolute_slot ;
2023-09-06 06:59:14 -07:00
log ::info! ( " Run_loop init {next_epoch_start_slot} current_epoch:{current_epoch:?} " ) ;
2023-08-11 00:13:31 -07:00
let mut spawned_task_toexec = FuturesUnordered ::new ( ) ;
let mut spawned_task_result = FuturesUnordered ::new ( ) ;
2023-08-30 07:06:11 -07:00
//use to set an initial state of all PA
2023-09-08 11:07:43 -07:00
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetStakeAccount (
current_epoch . absolute_slot - current_epoch . slot_index ,
0 ,
) ) ) ;
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetVoteAccount (
2023-09-06 06:59:14 -07:00
current_epoch . absolute_slot - current_epoch . slot_index ,
0 ,
) ) ) ;
2023-08-11 00:13:31 -07:00
2023-08-29 03:16:57 -07:00
//subscribe Geyser grpc
2023-09-02 08:52:28 -07:00
//slot subscription
2023-08-11 00:13:31 -07:00
let mut slots = HashMap ::new ( ) ;
slots . insert ( " client " . to_string ( ) , SubscribeRequestFilterSlots { } ) ;
2023-09-02 08:52:28 -07:00
//account subscription
2023-08-11 00:13:31 -07:00
let mut accounts : HashMap < String , SubscribeRequestFilterAccounts > = HashMap ::new ( ) ;
accounts . insert (
" client " . to_owned ( ) ,
SubscribeRequestFilterAccounts {
account : vec ! [ ] ,
2023-08-29 03:16:57 -07:00
owner : vec ! [
solana_sdk ::stake ::program ::ID . to_string ( ) ,
2023-09-11 09:27:21 -07:00
solana_sdk ::vote ::program ::ID . to_string ( ) ,
2023-08-29 03:16:57 -07:00
] ,
2023-08-11 00:13:31 -07:00
filters : vec ! [ ] ,
} ,
) ;
2023-09-02 08:52:28 -07:00
//block subscription
let mut blocks = HashMap ::new ( ) ;
blocks . insert (
" client " . to_string ( ) ,
SubscribeRequestFilterBlocks {
account_include : Default ::default ( ) ,
include_transactions : Some ( true ) ,
include_accounts : Some ( false ) ,
2023-09-02 09:11:01 -07:00
include_entries : None ,
2023-09-02 08:52:28 -07:00
} ,
) ;
//block Meta subscription filter
let mut blocks_meta = HashMap ::new ( ) ;
blocks_meta . insert ( " client " . to_string ( ) , SubscribeRequestFilterBlocksMeta { } ) ;
2023-08-11 00:13:31 -07:00
let mut confirmed_stream = client
. subscribe_once (
2023-08-30 09:21:10 -07:00
slots . clone ( ) ,
accounts . clone ( ) , //accounts
2023-08-11 00:13:31 -07:00
Default ::default ( ) , //tx
Default ::default ( ) , //entry
2023-09-02 08:52:28 -07:00
//blocks, //full block
2023-08-11 00:13:31 -07:00
Default ::default ( ) , //full block
2023-09-02 08:52:28 -07:00
//blocks_meta, //block meta
2023-08-11 00:13:31 -07:00
Default ::default ( ) , //block meta
Some ( CommitmentLevel ::Confirmed ) ,
vec! [ ] ,
)
. await ? ;
2023-08-31 03:13:27 -07:00
//log current data at interval
2023-09-01 01:07:42 -07:00
let mut log_interval = tokio ::time ::interval ( Duration ::from_millis ( 600000 ) ) ;
2023-08-31 03:13:27 -07:00
2023-09-05 03:53:28 -07:00
//start local rpc access to execute command.
let ( request_tx , mut request_rx ) = tokio ::sync ::mpsc ::channel ( 100 ) ;
let rpc_handle = crate ::rpc ::run_server ( request_tx ) . await ? ;
//make it run forever
tokio ::spawn ( rpc_handle . stopped ( ) ) ;
2023-09-08 11:07:43 -07:00
//Vote account management struct
let mut votestore = VoteStore ::new ( VOTESTORE_INITIAL_CAPACITY ) ;
2023-08-11 00:13:31 -07:00
loop {
tokio ::select! {
2023-09-11 09:27:21 -07:00
Some ( req ) = request_rx . recv ( ) = > {
match req {
crate ::rpc ::Requests ::SaveStakes = > {
tokio ::task ::spawn_blocking ( {
log ::info! ( " RPC start save_stakes " ) ;
let current_stakes = stakestore . get_cloned_stake_map ( ) ;
let move_epoch = current_epoch . clone ( ) ;
move | | {
let current_stake = crate ::leader_schedule ::build_current_stakes ( & current_stakes , & move_epoch , RPC_URL . to_string ( ) , CommitmentConfig ::confirmed ( ) ) ;
log ::info! ( " RPC save_stakes generation done " ) ;
if let Err ( err ) = crate ::leader_schedule ::save_schedule_on_file ( " stakes " , & current_stake ) {
log ::error! ( " Error during current stakes saving:{err} " ) ;
}
log ::info! ( " RPC save_stakes END " ) ;
2023-09-05 03:53:28 -07:00
2023-09-11 09:27:21 -07:00
}
} ) ;
2023-09-05 03:53:28 -07:00
}
2023-09-11 09:27:21 -07:00
crate ::rpc ::Requests ::BootstrapAccounts ( tx ) = > {
log ::info! ( " RPC start save_stakes " ) ;
let current_stakes = stakestore . get_cloned_stake_map ( ) ;
if let Err ( err ) = tx . send ( ( current_stakes , current_slot . confirmed_slot ) ) {
println! ( " Channel error during sending bacl request status error: {err:?} " ) ;
}
log ::info! ( " RPC bootstrap account send " ) ;
}
}
2023-09-05 03:53:28 -07:00
} ,
2023-08-31 03:13:27 -07:00
//log interval
_ = log_interval . tick ( ) = > {
2023-08-31 11:23:57 -07:00
log ::info! ( " Run_loop update new epoch:{current_epoch:?} current slot:{current_slot:?} next epoch start slot:{next_epoch_start_slot} " ) ;
2023-09-10 08:16:58 -07:00
log ::info! ( " Change epoch equality {} >= {} " , current_slot . confirmed_slot , next_epoch_start_slot - 1 ) ;
2023-08-31 03:13:27 -07:00
log ::info! ( " number of stake accounts:{} " , stakestore . nb_stake_account ( ) ) ;
}
2023-08-11 00:13:31 -07:00
//Execute RPC call in another task
2023-08-31 03:13:27 -07:00
Some ( to_exec ) = spawned_task_toexec . next ( ) = > {
2023-08-11 00:13:31 -07:00
let jh = tokio ::spawn ( async move {
match to_exec {
2023-09-08 11:07:43 -07:00
TaskToExec ::RpcGetStakeAccount ( epoch_start_slot , sleep_time ) = > {
if sleep_time > 0 {
tokio ::time ::sleep ( Duration ::from_secs ( sleep_time ) ) . await ;
}
log ::info! ( " TaskToExec RpcGetStakeAccount start " ) ;
let rpc_client = RpcClient ::new_with_timeout_and_commitment ( RPC_URL . to_string ( ) , Duration ::from_secs ( 600 ) , CommitmentConfig ::finalized ( ) ) ;
let res_stake = rpc_client . get_program_accounts ( & solana_sdk ::stake ::program ::id ( ) ) . await ;
log ::info! ( " TaskToExec RpcGetStakeAccount END " ) ;
TaskResult ::RpcGetStakeAccount ( res_stake , epoch_start_slot )
} ,
TaskToExec ::RpcGetVoteAccount ( epoch_start_slot , sleep_time ) = > {
2023-09-06 06:59:14 -07:00
if sleep_time > 0 {
tokio ::time ::sleep ( Duration ::from_secs ( sleep_time ) ) . await ;
}
2023-09-08 11:07:43 -07:00
log ::info! ( " TaskToExec RpcGetVoteAccount start " ) ;
2023-08-30 07:06:11 -07:00
let rpc_client = RpcClient ::new_with_timeout_and_commitment ( RPC_URL . to_string ( ) , Duration ::from_secs ( 600 ) , CommitmentConfig ::finalized ( ) ) ;
2023-09-08 11:07:43 -07:00
let res_vote = rpc_client . get_program_accounts ( & solana_sdk ::vote ::program ::id ( ) ) . await ;
log ::info! ( " TaskToExec RpcGetVoteAccount END " ) ;
TaskResult ::RpcGetVoteAccount ( res_vote , epoch_start_slot )
2023-08-11 00:13:31 -07:00
} ,
TaskToExec ::RpcGetCurrentEpoch = > {
2023-08-31 11:23:57 -07:00
//TODO remove no need epoch is calculated.
2023-08-31 03:13:27 -07:00
log ::info! ( " TaskToExec RpcGetCurrentEpoch start " ) ;
2023-08-31 11:23:57 -07:00
//wait 1 sec to be sure RPC change epoch
tokio ::time ::sleep ( Duration ::from_secs ( 1 ) ) . await ;
2023-09-05 03:53:28 -07:00
let rpc_client = RpcClient ::new_with_timeout_and_commitment ( RPC_URL . to_string ( ) , Duration ::from_secs ( 600 ) , CommitmentConfig ::finalized ( ) ) ;
2023-08-11 00:13:31 -07:00
let res = rpc_client . get_epoch_info ( ) . await ;
TaskResult ::CurrentEpoch ( res )
}
}
} ) ;
spawned_task_result . push ( jh ) ;
}
//Manage RPC call result execution
Some ( some_res ) = spawned_task_result . next ( ) = > {
match some_res {
2023-09-08 11:07:43 -07:00
Ok ( TaskResult ::RpcGetStakeAccount ( Ok ( stake_list ) , epoch_start_slot ) ) = > {
2023-08-12 01:58:41 -07:00
let Ok ( mut stake_map ) = extract_stakestore ( & mut stakestore ) else {
2023-08-11 00:13:31 -07:00
//retry later, epoch schedule is currently processed
2023-09-08 11:07:43 -07:00
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetStakeAccount ( epoch_start_slot , 0 ) ) ) ;
2023-08-11 00:13:31 -07:00
continue ;
} ;
//merge new PA with stake map in a specific thread
2023-08-12 01:58:41 -07:00
log ::trace! ( " Run_loop before Program account stake merge START " ) ;
2023-08-11 00:13:31 -07:00
2023-09-07 02:13:58 -07:00
let jh = tokio ::task ::spawn_blocking ( {
let move_epoch = current_epoch . clone ( ) ;
move | | {
//update pa_list to set slot update to start epoq one.
2023-09-08 11:07:43 -07:00
crate ::stakestore ::merge_program_account_in_strake_map ( & mut stake_map , stake_list , epoch_start_slot , & move_epoch ) ;
TaskResult ::MergeStakeList ( stake_map )
2023-09-07 02:13:58 -07:00
}
2023-08-11 00:13:31 -07:00
} ) ;
spawned_task_result . push ( jh ) ;
}
2023-08-30 07:06:11 -07:00
//getPA can fail should be retarted.
2023-09-08 11:07:43 -07:00
Ok ( TaskResult ::RpcGetStakeAccount ( Err ( err ) , epoch_start_slot ) ) = > {
log ::warn! ( " RPC call get Stake Account return invalid result: {err:?} " ) ;
2023-08-30 07:06:11 -07:00
//get pa can fail should be retarted.
2023-09-08 11:07:43 -07:00
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetStakeAccount ( epoch_start_slot , 0 ) ) ) ;
2023-08-30 07:06:11 -07:00
}
2023-09-08 11:07:43 -07:00
Ok ( TaskResult ::MergeStakeList ( stake_map ) ) = > {
2023-09-07 02:13:58 -07:00
if let Err ( err ) = merge_stakestore ( & mut stakestore , stake_map , & current_epoch ) {
2023-08-12 01:58:41 -07:00
//should never occurs because only one extract can occurs at time.
2023-08-11 00:13:31 -07:00
// during PA no epoch schedule can be done.
2023-08-12 01:58:41 -07:00
log ::warn! ( " merge stake on a non extract stake map err:{err} " ) ;
2023-08-11 00:13:31 -07:00
//restart the getPA.
2023-09-08 11:07:43 -07:00
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetStakeAccount ( 0 , 0 ) ) ) ;
2023-08-11 00:13:31 -07:00
continue ;
} ;
2023-08-30 07:06:11 -07:00
log ::info! ( " Run_loop Program account stake merge END " ) ;
2023-08-12 01:58:41 -07:00
//TODO REMOVE
//To test verify the schedule
2023-09-01 01:07:42 -07:00
// let Ok(stake_map) = extract_stakestore(&mut stakestore) else {
// log::info!("Epoch schedule aborted because a getPA is currently running.");
// continue;
// };
// let jh = tokio::task::spawn_blocking({
// let move_epoch = current_epoch.clone();
// move || {
// let schedule = crate::leader_schedule::calculate_leader_schedule_from_stake_map(&stake_map, &move_epoch);
// TaskResult::ScheduleResult(schedule.ok(), stake_map)
// }
// });
// spawned_task_result.push(jh);
2023-08-12 01:58:41 -07:00
//end test
}
2023-09-08 11:07:43 -07:00
Ok ( TaskResult ::RpcGetVoteAccount ( Ok ( vote_list ) , epoch_start_slot ) ) = > {
let Ok ( mut vote_map ) = extract_votestore ( & mut votestore ) else {
//retry later, epoch schedule is currently processed
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetVoteAccount ( epoch_start_slot , 0 ) ) ) ;
continue ;
} ;
//merge new PA with stake map in a specific thread
log ::trace! ( " Run_loop before Program account VOTE merge START " ) ;
let jh = tokio ::task ::spawn_blocking ( {
move | | {
//update pa_list to set slot update to start epoq one.
crate ::votestore ::merge_program_account_in_vote_map ( & mut vote_map , vote_list , epoch_start_slot ) ;
TaskResult ::MergeVoteList ( vote_map )
}
} ) ;
spawned_task_result . push ( jh ) ;
}
//getPA can fail should be retarted.
Ok ( TaskResult ::RpcGetVoteAccount ( Err ( err ) , epoch_start_slot ) ) = > {
log ::warn! ( " RPC call getVote account return invalid result: {err:?} " ) ;
//get pa can fail should be retarted.
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetVoteAccount ( epoch_start_slot , 0 ) ) ) ;
}
Ok ( TaskResult ::MergeVoteList ( vote_map ) ) = > {
if let Err ( err ) = merge_votestore ( & mut votestore , vote_map ) {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log ::warn! ( " merge vote on a non extract stake map err:{err} " ) ;
//restart the getPA.
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetVoteAccount ( 0 , 0 ) ) ) ;
continue ;
} ;
log ::info! ( " Run_loop Program Vote account merge END " ) ;
}
Ok ( TaskResult ::CurrentEpoch ( Ok ( epoch_info ) ) ) = > {
//TODO remove no need epoch is calculated.
//only update new epoch slot if the RPC call return the next epoch. Some time it still return the current epoch.
if current_epoch . epoch < = epoch_info . epoch {
current_epoch = epoch_info ;
//calcualte slotindex with current slot. getEpichInfo doesn't return data on current slot.
if current_slot . confirmed_slot > current_epoch . absolute_slot {
let diff = current_slot . confirmed_slot - current_epoch . absolute_slot ;
current_epoch . absolute_slot + = diff ;
current_epoch . slot_index + = diff ;
log ::trace! ( " Update current epoch, diff:{diff} " ) ;
}
next_epoch_start_slot = current_epoch . slots_in_epoch - current_epoch . slot_index + current_epoch . absolute_slot ;
log ::info! ( " Run_loop update new epoch:{current_epoch:?} current slot:{current_slot:?} next_epoch_start_slot:{next_epoch_start_slot} " ) ;
} else {
//RPC epoch hasn't changed retry
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetCurrentEpoch ) ) ;
}
}
2023-09-09 02:55:28 -07:00
Ok ( TaskResult ::ScheduleResult ( schedule_opt , stake_map , vote_map ) ) = > {
2023-08-12 01:58:41 -07:00
//merge stake
2023-09-09 02:55:28 -07:00
let merge_error = match merge_stakestore ( & mut stakestore , stake_map , & current_epoch ) {
Ok ( ( ) ) = > false ,
Err ( err ) = > {
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log ::warn! ( " merge stake on a non extract stake map err:{err} " ) ;
//restart the getPA.
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetStakeAccount ( 0 , 0 ) ) ) ;
true
}
} ;
//merge vote
if let Err ( err ) = merge_votestore ( & mut votestore , vote_map ) {
2023-08-12 01:58:41 -07:00
//should never occurs because only one extract can occurs at time.
// during PA no epoch schedule can be done.
log ::warn! ( " merge stake on a non extract stake map err:{err} " ) ;
//restart the getPA.
2023-09-09 02:55:28 -07:00
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetVoteAccount ( 0 , 0 ) ) ) ;
2023-08-12 01:58:41 -07:00
continue ;
} ;
2023-09-09 02:55:28 -07:00
if merge_error {
continue ;
}
2023-08-12 01:58:41 -07:00
//verify calculated shedule with the one the RPC return.
if let Some ( schedule ) = schedule_opt {
tokio ::task ::spawn_blocking ( | | {
2023-09-03 05:40:43 -07:00
//10 second that the schedule has been calculated on the validator
2023-09-09 02:55:28 -07:00
std ::thread ::sleep ( std ::time ::Duration ::from_secs ( 20 ) ) ;
2023-09-05 07:18:34 -07:00
log ::info! ( " Start Verify schedule " ) ;
2023-08-12 01:58:41 -07:00
if let Err ( err ) = crate ::leader_schedule ::verify_schedule ( schedule , RPC_URL . to_string ( ) ) {
log ::warn! ( " Error during schedule verification:{err} " ) ;
}
2023-09-05 07:18:34 -07:00
log ::info! ( " End Verify schedule " ) ;
2023-08-12 01:58:41 -07:00
} ) ;
}
2023-08-11 00:13:31 -07:00
}
_ = > log ::warn! ( " RPC call return invalid result: {some_res:?} " ) ,
}
}
//get confirmed slot or account
ret = confirmed_stream . next ( ) = > {
match ret {
Some ( message ) = > {
//process the message
match message {
Ok ( msg ) = > {
2023-08-18 07:14:04 -07:00
//log::info!("new message: {msg:?}");
2023-08-11 00:13:31 -07:00
match msg . update_oneof {
Some ( UpdateOneof ::Account ( account ) ) = > {
//store new account stake.
if let Some ( account ) = read_account ( account , current_slot . confirmed_slot ) {
2023-08-29 03:16:57 -07:00
//log::trace!("Geyser receive new account");
match account . owner {
solana_sdk ::stake ::program ::ID = > {
2023-09-07 02:13:58 -07:00
if let Err ( err ) = stakestore . add_stake ( account , next_epoch_start_slot - 1 , & current_epoch ) {
2023-08-29 03:16:57 -07:00
log ::warn! ( " Can't add new stake from account data err:{} " , err ) ;
continue ;
}
}
solana_sdk ::vote ::program ::ID = > {
//process vote accout notification
2023-09-08 11:07:43 -07:00
if let Err ( err ) = votestore . add_vote ( account , next_epoch_start_slot - 1 ) {
log ::warn! ( " Can't add new stake from account data err:{} " , err ) ;
continue ;
2023-08-29 03:16:57 -07:00
}
}
_ = > log ::warn! ( " receive an account notification from a unknown owner:{account:?} " ) ,
2023-08-11 00:13:31 -07:00
}
}
}
Some ( UpdateOneof ::Slot ( slot ) ) = > {
2023-09-05 07:18:34 -07:00
//for the first update of slot correct epoch info data.
if let CommitmentLevel ::Confirmed = slot . status ( ) {
if current_slot . confirmed_slot = = 0 {
let diff = slot . slot - current_epoch . absolute_slot ;
current_epoch . absolute_slot + = diff ;
current_epoch . slot_index + = diff ;
log ::trace! ( " Set current epoch with diff:{diff} slot:{} current:{} " , slot . slot , current_epoch . absolute_slot ) ;
}
}
2023-08-11 00:13:31 -07:00
//update current slot
2023-08-31 05:24:16 -07:00
//log::info!("Processing slot: {:?} current slot:{:?}", slot, current_slot);
2023-09-05 07:18:34 -07:00
log_end_epoch ( current_slot . confirmed_slot , next_epoch_start_slot , current_epoch . slot_index , format! ( " Receive slot: {:?} at commitment: {:?} " , slot . slot , slot . status ( ) ) ) ;
//update epoch info
if let CommitmentLevel ::Confirmed = slot . status ( ) {
if current_slot . confirmed_slot ! = 0 & & slot . slot > current_slot . confirmed_slot {
let diff = slot . slot - current_slot . confirmed_slot ;
current_epoch . slot_index + = diff ;
current_epoch . absolute_slot + = diff ;
log ::trace! ( " Update epoch with slot, diff:{diff} " ) ;
}
}
2023-09-02 08:52:28 -07:00
2023-08-11 00:13:31 -07:00
current_slot . update_slot ( & slot ) ;
2023-09-02 08:52:28 -07:00
if current_slot . confirmed_slot > = next_epoch_start_slot - 1 { //slot can be non consecutif.
2023-09-06 04:50:36 -07:00
log ::info! ( " End epoch slot, change epoch. Calculate schedule at current slot:{} " , current_slot . confirmed_slot ) ;
2023-08-12 01:58:41 -07:00
let Ok ( stake_map ) = extract_stakestore ( & mut stakestore ) else {
2023-09-09 02:55:28 -07:00
log ::info! ( " Epoch schedule aborted because a extract_stakestore faild. " ) ;
continue ;
} ;
let Ok ( vote_map ) = extract_votestore ( & mut votestore ) else {
log ::info! ( " Epoch schedule aborted because extract_votestore faild. " ) ;
//cancel stake extraction
merge_stakestore ( & mut stakestore , stake_map , & current_epoch ) . unwrap ( ) ; //just extracted.
2023-08-11 00:13:31 -07:00
continue ;
} ;
2023-09-06 06:59:14 -07:00
//reload PA account for new epoch start. TODO replace with bootstrap.
2023-09-09 02:55:28 -07:00
//spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetStakeAccount(next_epoch_start_slot, 30))); //Wait 30s to get new PA.
2023-09-06 06:59:14 -07:00
2023-08-18 07:14:04 -07:00
//change epoch. Change manually then update using RPC.
current_epoch . epoch + = 1 ;
2023-09-05 07:18:34 -07:00
current_epoch . slot_index = 1 ;
2023-08-31 11:23:57 -07:00
next_epoch_start_slot = next_epoch_start_slot + current_epoch . slots_in_epoch ; //set to next epochs.
2023-08-31 06:27:47 -07:00
2023-08-18 07:14:04 -07:00
log ::info! ( " End slot epoch update calculated next epoch:{current_epoch:?} " ) ;
2023-08-11 00:13:31 -07:00
//calculate schedule in a dedicated thread.
2023-09-01 01:07:42 -07:00
let jh = tokio ::task ::spawn_blocking ( {
let move_epoch = current_epoch . clone ( ) ;
move | | {
2023-09-09 02:55:28 -07:00
let schedule = crate ::leader_schedule ::calculate_leader_schedule_from_stake_map ( & stake_map , & vote_map , & move_epoch ) ;
2023-09-02 08:52:28 -07:00
log ::info! ( " End calculate leader schedule at slot:{} " , current_slot . confirmed_slot ) ;
2023-09-09 02:55:28 -07:00
TaskResult ::ScheduleResult ( schedule . ok ( ) , stake_map , vote_map )
2023-09-01 01:07:42 -07:00
}
2023-08-11 00:13:31 -07:00
} ) ;
spawned_task_result . push ( jh ) ;
spawned_task_toexec . push ( futures ::future ::ready ( TaskToExec ::RpcGetCurrentEpoch ) ) ;
2023-09-06 04:50:36 -07:00
//reload current Stake account a epoch change to synchronize.
2023-08-11 00:13:31 -07:00
}
}
2023-09-02 08:52:28 -07:00
Some ( UpdateOneof ::BlockMeta ( block_meta ) ) = > {
log ::info! ( " Receive Block Meta at slot: {} " , block_meta . slot ) ;
}
Some ( UpdateOneof ::Block ( block ) ) = > {
println! ( " Receive Block at slot: {} " , block . slot ) ;
}
2023-09-01 01:26:06 -07:00
Some ( UpdateOneof ::Ping ( _ ) ) = > log ::trace! ( " UpdateOneof::Ping " ) ,
2023-08-11 00:13:31 -07:00
bad_msg = > {
log ::info! ( " Geyser stream unexpected message received:{:?} " , bad_msg ) ;
}
}
}
Err ( error ) = > {
log ::error! ( " Geyser stream receive an error has message: {error:?}, try to reconnect and resynchronize. " ) ;
2023-08-30 07:06:11 -07:00
//todo reconnect and resynchronize.
//break;
2023-08-11 00:13:31 -07:00
}
}
}
None = > {
log ::warn! ( " The geyser stream close try to reconnect and resynchronize. " ) ;
2023-08-30 09:21:10 -07:00
let new_confirmed_stream = client
. subscribe_once (
slots . clone ( ) ,
accounts . clone ( ) , //accounts
Default ::default ( ) , //tx
Default ::default ( ) , //entry
Default ::default ( ) , //full block
Default ::default ( ) , //block meta
Some ( CommitmentLevel ::Confirmed ) ,
vec! [ ] ,
)
. await ? ;
confirmed_stream = new_confirmed_stream ;
log ::info! ( " reconnection done " ) ;
//TODO resynchronize.
2023-08-11 00:13:31 -07:00
}
}
}
}
}
2023-08-30 09:21:10 -07:00
//Ok(())
2023-08-11 00:13:31 -07:00
}
2023-09-11 09:27:21 -07:00
#[ derive(Default, Debug, Clone) ]
2023-08-11 00:13:31 -07:00
struct CurrentSlot {
processed_slot : u64 ,
confirmed_slot : u64 ,
finalized_slot : u64 ,
}
impl CurrentSlot {
fn update_slot ( & mut self , slot : & SubscribeUpdateSlot ) {
2023-08-31 06:27:47 -07:00
let updade = | commitment : & str , current_slot : & mut u64 , new_slot : u64 | {
2023-08-31 05:24:16 -07:00
//verify that the slot is consecutif
if * current_slot ! = 0 & & new_slot ! = * current_slot + 1 {
2023-09-01 01:07:42 -07:00
log ::trace! (
2023-08-31 06:27:47 -07:00
" At {commitment} not consecutif slot send: current_slot:{} new_slot{} " ,
2023-08-31 05:24:16 -07:00
current_slot ,
new_slot
) ;
}
* current_slot = new_slot
} ;
2023-08-11 00:13:31 -07:00
match slot . status ( ) {
2023-08-31 06:27:47 -07:00
CommitmentLevel ::Processed = > updade ( " Processed " , & mut self . processed_slot , slot . slot ) ,
2023-08-31 05:24:16 -07:00
2023-08-31 06:27:47 -07:00
CommitmentLevel ::Confirmed = > updade ( " Confirmed " , & mut self . confirmed_slot , slot . slot ) ,
2023-08-31 05:24:16 -07:00
2023-08-31 06:27:47 -07:00
CommitmentLevel ::Finalized = > updade ( " Finalized " , & mut self . finalized_slot , slot . slot ) ,
2023-08-11 00:13:31 -07:00
}
}
}
#[ derive(Debug) ]
#[ allow(dead_code) ]
pub struct AccountPretty {
is_startup : bool ,
slot : u64 ,
pubkey : Pubkey ,
lamports : u64 ,
owner : Pubkey ,
executable : bool ,
rent_epoch : u64 ,
data : Vec < u8 > ,
write_version : u64 ,
txn_signature : String ,
}
impl AccountPretty {
fn read_stake ( & self ) -> anyhow ::Result < Option < Delegation > > {
2023-09-08 11:07:43 -07:00
if self . data . is_empty ( ) {
log ::warn! ( " Stake account with empty data. Can't read vote. " ) ;
bail! ( " Error: read Stake account with empty data " ) ;
}
2023-08-11 00:13:31 -07:00
crate ::stakestore ::read_stake_from_account_data ( self . data . as_slice ( ) )
}
2023-08-29 03:16:57 -07:00
fn read_vote ( & self ) -> anyhow ::Result < VoteState > {
if self . data . is_empty ( ) {
log ::warn! ( " Vote account with empty data. Can't read vote. " ) ;
2023-09-08 11:07:43 -07:00
bail! ( " Error: read Vote account with empty data " ) ;
2023-08-29 03:16:57 -07:00
}
Ok ( VoteState ::deserialize ( & self . data ) ? )
}
2023-08-11 00:13:31 -07:00
}
fn read_account (
geyser_account : SubscribeUpdateAccount ,
current_slot : u64 ,
) -> Option < AccountPretty > {
let Some ( inner_account ) = geyser_account . account else {
log ::warn! ( " Receive a SubscribeUpdateAccount without account. " ) ;
return None ;
} ;
if geyser_account . slot ! = current_slot {
2023-09-03 05:40:43 -07:00
log ::trace! (
2023-08-11 00:13:31 -07:00
" Get geyser account on a different slot:{} of the current:{current_slot} " ,
geyser_account . slot
) ;
}
Some ( AccountPretty {
is_startup : geyser_account . is_startup ,
slot : geyser_account . slot ,
pubkey : Pubkey ::try_from ( inner_account . pubkey ) . expect ( " valid pubkey " ) ,
lamports : inner_account . lamports ,
owner : Pubkey ::try_from ( inner_account . owner ) . expect ( " valid pubkey " ) ,
executable : inner_account . executable ,
rent_epoch : inner_account . rent_epoch ,
data : inner_account . data ,
write_version : inner_account . write_version ,
txn_signature : bs58 ::encode ( inner_account . txn_signature . unwrap_or_default ( ) ) . into_string ( ) ,
} )
}
#[ derive(Debug) ]
enum TaskToExec {
2023-09-08 11:07:43 -07:00
RpcGetStakeAccount ( u64 , u64 ) , //epoch_start_slot, sleept time
RpcGetVoteAccount ( u64 , u64 ) , //epoch_start_slot, sleept time
2023-08-11 00:13:31 -07:00
RpcGetCurrentEpoch ,
}
#[ derive(Debug) ]
enum TaskResult {
2023-09-08 11:07:43 -07:00
RpcGetStakeAccount ( Result < Vec < ( Pubkey , Account ) > , ClientError > , u64 ) , //stake_list, vote_list
RpcGetVoteAccount ( Result < Vec < ( Pubkey , Account ) > , ClientError > , u64 ) , //stake_list, vote_list
2023-08-11 00:13:31 -07:00
CurrentEpoch ( Result < EpochInfo , ClientError > ) ,
2023-09-08 11:07:43 -07:00
MergeStakeList ( crate ::stakestore ::StakeMap ) ,
MergeVoteList ( crate ::votestore ::VoteMap ) ,
2023-09-09 02:55:28 -07:00
ScheduleResult (
Option < LeaderSchedule > ,
crate ::stakestore ::StakeMap ,
crate ::votestore ::VoteMap ,
) ,
2023-08-11 00:13:31 -07:00
}