add some timeout before stake PA update at epoch change

This commit is contained in:
musitdev 2023-09-06 15:59:14 +02:00
parent 7b60d64d6c
commit b11af4107c
1 changed files with 22 additions and 17 deletions

View File

@ -111,14 +111,16 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
};
let mut next_epoch_start_slot =
current_epoch.slots_in_epoch - current_epoch.slot_index + current_epoch.absolute_slot;
let mut current_epoch_start_slot = current_epoch.absolute_slot - current_epoch.slot_index;
log::info!("Run_loop init {current_epoch_start_slot} {next_epoch_start_slot} current_epoch:{current_epoch:?}");
log::info!("Run_loop init {next_epoch_start_slot} current_epoch:{current_epoch:?}");
let mut spawned_task_toexec = FuturesUnordered::new();
let mut spawned_task_result = FuturesUnordered::new();
//use to set an initial state of all PA
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa(
current_epoch.absolute_slot - current_epoch.slot_index,
0,
)));
//subscribe Geyser grpc
//slot subscription
@ -207,12 +209,15 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
Some(to_exec) = spawned_task_toexec.next() => {
let jh = tokio::spawn(async move {
match to_exec {
TaskToExec::RpcGetPa => {
TaskToExec::RpcGetPa(epoch_start_slot, sleep_time) => {
if sleep_time > 0 {
tokio::time::sleep(Duration::from_secs(sleep_time)).await;
}
log::info!("TaskToExec RpcGetPa start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(RPC_URL.to_string(), Duration::from_secs(600), CommitmentConfig::finalized());
let res = rpc_client.get_program_accounts(&solana_sdk::stake::program::id()).await;
log::info!("TaskToExec RpcGetPa END");
TaskResult::RpcGetPa(res)
TaskResult::RpcGetPa(res, epoch_start_slot)
},
TaskToExec::RpcGetCurrentEpoch => {
//TODO remove no need epoch is calculated.
@ -230,10 +235,10 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//Manage RPC call result execution
Some(some_res) = spawned_task_result.next() => {
match some_res {
Ok(TaskResult::RpcGetPa(Ok(pa_list))) => {
Ok(TaskResult::RpcGetPa(Ok(pa_list), epoch_start_slot)) => {
let Ok(mut stake_map) = extract_stakestore(&mut stakestore) else {
//retry later, epoch schedule is currently processed
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa(epoch_start_slot, 0)));
continue;
};
//merge new PA with stake map in a specific thread
@ -241,16 +246,16 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let jh = tokio::task::spawn_blocking(move || {
//update pa_list to set slot update to start epoq one.
crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, pa_list, current_epoch_start_slot);
crate::stakestore::merge_program_account_in_strake_map(&mut stake_map, pa_list, epoch_start_slot);
TaskResult::MergePAList(stake_map)
});
spawned_task_result.push(jh);
}
//getPA can fail should be retarted.
Ok(TaskResult::RpcGetPa(Err(err))) => {
Ok(TaskResult::RpcGetPa(Err(err), epoch_start_slot)) => {
log::warn!("RPC call getPA return invalid result: {err:?}");
//get pa can fail should be retarted.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa(epoch_start_slot, 0)));
}
Ok(TaskResult::CurrentEpoch(Ok(epoch_info))) => {
//TODO remove no need epoch is calculated.
@ -277,7 +282,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa(0, 0)));
continue;
};
log::info!("Run_loop Program account stake merge END");
@ -306,7 +311,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
// during PA no epoch schedule can be done.
log::warn!("merge stake on a non extract stake map err:{err}");
//restart the getPA.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa(0, 0)));
continue;
};
@ -397,16 +402,17 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
continue;
};
//reload PA account for new epoch start. TODO replace with bootstrap.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa(next_epoch_start_slot, 30))); //Wait 30s to get new PA.
//change epoch. Change manually then update using RPC.
current_epoch.epoch +=1;
current_epoch.slot_index = 1;
current_epoch_start_slot = next_epoch_start_slot;
next_epoch_start_slot = next_epoch_start_slot + current_epoch.slots_in_epoch; //set to next epochs.
log::info!("End slot epoch update calculated next epoch:{current_epoch:?}");
//calculate schedule in a dedicated thread.
let jh = tokio::task::spawn_blocking({
let move_epoch = current_epoch.clone();
move || {
@ -419,7 +425,6 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetCurrentEpoch));
//reload current Stake account a epoch change to synchronize.
spawned_task_toexec.push(futures::future::ready(TaskToExec::RpcGetPa));
}
}
@ -561,13 +566,13 @@ fn read_account(
#[derive(Debug)]
enum TaskToExec {
RpcGetPa,
RpcGetPa(u64, u64), //epoch_start_slot, sleept time
RpcGetCurrentEpoch,
}
#[derive(Debug)]
enum TaskResult {
RpcGetPa(Result<Vec<(Pubkey, Account)>, ClientError>),
RpcGetPa(Result<Vec<(Pubkey, Account)>, ClientError>, u64),
CurrentEpoch(Result<EpochInfo, ClientError>),
MergePAList(crate::stakestore::StakeMap),
ScheduleResult(Option<LeaderSchedule>, crate::stakestore::StakeMap),