add LeaderSchedule RPC and stake history account

This commit is contained in:
musitdev 2023-09-27 18:52:35 +02:00
parent fb2923535a
commit 61a5e2344a
8 changed files with 356 additions and 101 deletions

View File

@ -31,6 +31,7 @@ log = "0.4.17"
tracing-subscriber = "0.3.16"
tokio = { version = "1.*", features = ["full"] }
nom = "7.1.3"
itertools = "0.11.0"
reqwest = "0.11"
serde = "1.0"

View File

@ -43,7 +43,7 @@ pub fn run_bootstrap_events(
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<BootstrapEvent>>,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
data: &BootstrapData,
data: &mut BootstrapData,
) {
let result = process_bootstrap_event(event, stakestore, votestore, data);
match result {
@ -51,18 +51,23 @@ pub fn run_bootstrap_events(
BootsrapProcessResult::Event(event) => {
run_bootstrap_events(event, bootstrap_tasks, stakestore, votestore, data)
}
BootsrapProcessResult::End => (),
BootsrapProcessResult::End => data.done = true,
}
}
pub enum BootstrapEvent {
InitBootstrap,
BootstrapAccountsFetched(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>),
BootstrapAccountsFetched(
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
),
StoreExtracted(
StakeMap,
VoteMap,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
),
AccountsMerged(StakeMap, VoteMap),
Exit,
@ -75,6 +80,7 @@ enum BootsrapProcessResult {
}
pub struct BootstrapData {
pub done: bool,
pub current_epoch: u64,
pub next_epoch_start_slot: Slot,
pub sleep_time: u64,
@ -98,8 +104,8 @@ fn process_bootstrap_event(
tokio::time::sleep(Duration::from_secs(sleep_time)).await;
}
match crate::bootstrap::bootstrap_accounts(rpc_url).await {
Ok((stakes, votes)) => {
BootstrapEvent::BootstrapAccountsFetched(stakes, votes)
Ok((stakes, votes, history)) => {
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history)
}
Err(err) => {
log::warn!(
@ -112,22 +118,22 @@ fn process_bootstrap_event(
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::BootstrapAccountsFetched(stakes, votes) => {
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history) => {
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
match (extract_stakestore(stakestore), extract_votestore(votestore)) {
(Ok(stake_map), Ok(vote_map)) => BootsrapProcessResult::Event(
BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes),
BootstrapEvent::StoreExtracted(stake_map, vote_map, stakes, votes, history),
),
_ => {
let jh = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
BootstrapEvent::BootstrapAccountsFetched(stakes, votes)
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history)
});
BootsrapProcessResult::TaskHandle(jh)
}
}
}
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes) => {
BootstrapEvent::StoreExtracted(mut stake_map, mut vote_map, stakes, votes, history) => {
log::info!("BootstrapEvent::StoreExtracted RECV");
//merge new PA with stake map and vote map in a specific task
let jh = tokio::task::spawn_blocking({
@ -137,6 +143,7 @@ fn process_bootstrap_event(
crate::stakestore::merge_program_account_in_strake_map(
&mut stake_map,
stakes,
history,
0, //with RPC no way to know the slot of the account update. Set to 0.
current_epoch,
);
@ -170,17 +177,34 @@ fn process_bootstrap_event(
async fn bootstrap_accounts(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>), ClientError> {
let furure = get_stake_account(rpc_url.clone()).and_then(|stakes| async move {
get_vote_account(rpc_url).await.map(|votes| (stakes, votes))
});
) -> Result<
(
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
),
ClientError,
> {
let furure = get_stake_account(rpc_url)
.and_then(|(stakes, rpc_url)| async move {
get_vote_account(rpc_url)
.await
.map(|(votes, rpc_url)| (stakes, votes, rpc_url))
})
.and_then(|(stakes, votes, rpc_url)| async move {
get_stakehistory_account(rpc_url)
.await
.map(|history| (stakes, votes, history))
});
furure.await
}
async fn get_stake_account(rpc_url: String) -> Result<Vec<(Pubkey, Account)>, ClientError> {
async fn get_stake_account(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetStakeAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
@ -188,13 +212,15 @@ async fn get_stake_account(rpc_url: String) -> Result<Vec<(Pubkey, Account)>, Cl
.get_program_accounts(&solana_sdk::stake::program::id())
.await;
log::info!("TaskToExec RpcGetStakeAccount END");
res_stake
res_stake.map(|stake| (stake, rpc_url))
}
async fn get_vote_account(rpc_url: String) -> Result<Vec<(Pubkey, Account)>, ClientError> {
async fn get_vote_account(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetVoteAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
@ -202,5 +228,22 @@ async fn get_vote_account(rpc_url: String) -> Result<Vec<(Pubkey, Account)>, Cli
.get_program_accounts(&solana_sdk::vote::program::id())
.await;
log::info!("TaskToExec RpcGetVoteAccount END");
res_vote
res_vote.map(|votes| (votes, rpc_url))
}
async fn get_stakehistory_account(rpc_url: String) -> Result<Vec<(Pubkey, Account)>, ClientError> {
log::info!("TaskToExec RpcGetStakeHistory start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client
.get_program_accounts(&solana_sdk::sysvar::stake_history::id())
.await;
log::info!(
"TaskToExec RpcGetStakeHistory END with len:{:?}",
res_stake.as_ref().map(|history| history.len())
);
res_stake
}

View File

@ -7,6 +7,18 @@ use solana_sdk::epoch_info::EpochInfo;
use yellowstone_grpc_proto::geyser::CommitmentLevel as GeyserCommitmentLevel;
use yellowstone_grpc_proto::prelude::SubscribeUpdateSlot;
pub fn get_epoch_for_slot(slot: Slot, current_epoch: &CurrentEpochSlotState) -> u64 {
let slots_in_epoch = current_epoch.current_epoch.slots_in_epoch;
let epoch_start_slot = current_epoch.current_epoch_start_slot();
if slot >= epoch_start_slot {
let slot_distance = (slot - epoch_start_slot) / slots_in_epoch;
current_epoch.current_epoch.epoch + slot_distance
} else {
let slot_distance = (epoch_start_slot - slot) / slots_in_epoch;
current_epoch.current_epoch.epoch - slot_distance
}
}
#[derive(Debug)]
pub struct CurrentEpochSlotState {
pub current_slot: CurrentSlot,
@ -31,6 +43,10 @@ impl CurrentEpochSlotState {
})
}
pub fn current_epoch_start_slot(&self) -> Slot {
self.current_epoch.absolute_slot - self.current_epoch.slot_index
}
pub fn current_epoch_end_slot(&self) -> Slot {
self.next_epoch_start_slot - 1
}

View File

@ -2,6 +2,7 @@ use crate::stakestore::{extract_stakestore, merge_stakestore, StakeMap, StakeSto
use crate::votestore::{extract_votestore, merge_votestore, VoteMap, VoteStore};
use anyhow::bail;
use futures::stream::FuturesUnordered;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use solana_client::rpc_client::RpcClient;
use solana_ledger::leader_schedule::LeaderSchedule;
@ -15,6 +16,7 @@ use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::task::JoinHandle;
@ -23,18 +25,20 @@ const SCHEDULE_STAKE_BASE_FILE_NAME: &str = "aggregate_export_votestake_";
pub const MAX_EPOCH_VALUE: u64 = 18446744073709551615;
#[derive(Debug, Default)]
pub struct CalculatedSchedule {
pub current: Option<LeaderScheduleData>,
pub next: Option<LeaderScheduleData>,
}
#[derive(Debug)]
pub struct LeaderScheduleData {
pub schedule: LeaderSchedule,
pub schedule: Arc<HashMap<String, Vec<usize>>>,
pub vote_stakes: Vec<(Pubkey, u64)>,
pub epoch: u64,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct EpochStake {
epoch: u64,
stakes: Vec<(Pubkey, u64)>,
@ -72,12 +76,12 @@ pub fn bootstrap_leader_schedule(
Ok(CalculatedSchedule {
current: Some(LeaderScheduleData {
schedule: current_schedule,
schedule: Arc::new(current_schedule),
vote_stakes: current_stakes.stakes,
epoch: current_stakes.epoch,
}),
next: Some(LeaderScheduleData {
schedule: next_schedule,
schedule: Arc::new(next_schedule),
vote_stakes: next_stakes.stakes,
epoch: next_stakes.epoch,
}),
@ -144,7 +148,7 @@ pub fn run_leader_schedule_events(
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> Option<(
anyhow::Result<(LeaderSchedule, Vec<(Pubkey, u64)>)>,
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
EpochInfo,
)> {
let result = process_leadershedule_event(event, stakestore, votestore);
@ -166,7 +170,7 @@ pub enum LeaderScheduleEvent {
MergeStoreAndSaveSchedule(
StakeMap,
VoteMap,
anyhow::Result<(LeaderSchedule, Vec<(Pubkey, u64)>)>,
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
EpochInfo,
),
}
@ -175,7 +179,7 @@ enum LeaderScheduleResult {
TaskHandle(JoinHandle<LeaderScheduleEvent>),
Event(LeaderScheduleEvent),
End(
anyhow::Result<(LeaderSchedule, Vec<(Pubkey, u64)>)>,
anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)>,
EpochInfo,
),
}
@ -214,15 +218,13 @@ fn process_leadershedule_event(
&schedule_epoch,
);
if let Ok((_, vote_stakes)) = &schedule_and_stakes {
let filename = format!(
"{SCHEDULE_STAKE_BASE_FILE_NAME}_{}.json",
schedule_epoch.epoch
);
if let Err(err) =
save_schedule_vote_stakes(&filename, vote_stakes, schedule_epoch.epoch)
{
if let Err(err) = save_schedule_vote_stakes(
SCHEDULE_STAKE_BASE_FILE_NAME,
vote_stakes,
schedule_epoch.epoch,
) {
log::error!(
"Error during new schedule of epoch:{} saving in file:{filename} error:{err}",
"Error during saving in file of the new schedule of epoch:{} error:{err}",
schedule_epoch.epoch
);
}
@ -267,7 +269,7 @@ fn calculate_leader_schedule_from_stake_map(
stake_map: &crate::stakestore::StakeMap,
vote_map: &crate::votestore::VoteMap,
current_epoch_info: &EpochInfo,
) -> anyhow::Result<(LeaderSchedule, Vec<(Pubkey, u64)>)> {
) -> anyhow::Result<(HashMap<String, Vec<usize>>, Vec<(Pubkey, u64)>)> {
let mut stakes = HashMap::<Pubkey, u64>::new();
log::trace!(
"calculate_leader_schedule_from_stake_map vote map len:{} stake map len:{}",
@ -354,7 +356,7 @@ fn calculate_leader_schedule(
stakes: &mut Vec<(Pubkey, u64)>,
epoch: u64,
slots_in_epoch: u64,
) -> anyhow::Result<LeaderSchedule> {
) -> anyhow::Result<HashMap<String, Vec<usize>>> {
if stakes.is_empty() {
bail!("calculate_leader_schedule stakes list is empty. no schedule can be calculated.");
}
@ -362,12 +364,17 @@ fn calculate_leader_schedule(
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
sort_stakes(stakes);
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch:?}");
Ok(LeaderSchedule::new(
&stakes,
seed,
slots_in_epoch,
NUM_CONSECUTIVE_LEADER_SLOTS,
))
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
let slot_schedule = schedule
.get_slot_leaders()
.iter()
.enumerate()
.map(|(i, pk)| (pk.to_string(), i))
.into_group_map()
.into_iter()
.collect();
Ok(slot_schedule)
}
// Cribbed from leader_schedule_utils

View File

@ -11,6 +11,7 @@
}
'
curl http://localhost:3000 -X POST -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
@ -18,7 +19,28 @@ curl http://localhost:3000 -X POST -H "Content-Type: application/json" -d '
"method": "bootstrap_accounts",
"params": []
}
' -o extract_stake_532_agg.json
curl http://localhost:3000 -X POST -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
"id" : 1,
"method": "stake_accounts",
"params": []
}
' -o extract_stake_529_end.json
curl http://localhost:3000 -X POST -H "Content-Type: application/json" -d '
{
"jsonrpc": "2.0",
"id" : 1,
"method": "getLeaderSchedule",
"params": []
}
'
*/
//TODO: add stake verify that it' not already desactivated.
@ -31,11 +53,17 @@ use crate::votestore::VoteStore;
use anyhow::bail;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use solana_sdk::account::Account;
use solana_sdk::account::AccountSharedData;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake_history::StakeHistory;
use solana_sdk::sysvar::rent::Rent;
use solana_sdk::vote::state::VoteState;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use tokio::time::Duration;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::CommitmentLevel;
@ -137,11 +165,33 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let mut votestore = VoteStore::new(VOTESTORE_INITIAL_CAPACITY);
//leader schedule
let mut leader_schedule_data = crate::leader_schedule::bootstrap_leader_schedule(
CURRENT_SCHEDULE_VOTE_STAKES_FILE,
NEXT_SCHEDULE_VOTE_STAKES_FILE,
let args: Vec<String> = env::args().collect();
log::info!("agrs:{:?}", args);
let (schedule_current_file, schedule_next_file) = if args.len() == 3 {
let current_path: String = args[1]
.parse()
.unwrap_or(CURRENT_SCHEDULE_VOTE_STAKES_FILE.to_string());
let next_path: String = args[2]
.parse()
.unwrap_or(NEXT_SCHEDULE_VOTE_STAKES_FILE.to_string());
(current_path, next_path)
} else {
(
CURRENT_SCHEDULE_VOTE_STAKES_FILE.to_string(),
NEXT_SCHEDULE_VOTE_STAKES_FILE.to_string(),
)
};
let mut leader_schedule_data = match crate::leader_schedule::bootstrap_leader_schedule(
&schedule_current_file,
&schedule_next_file,
current_epoch_state.current_epoch.slots_in_epoch,
)?;
) {
Ok(data) => data,
Err(err) => {
log::warn!("Can't load vote stakes using these files: current:{schedule_current_file} next:{schedule_next_file}. Error:{err}");
crate::leader_schedule::CalculatedSchedule::default()
}
};
//future execution collection.
let mut spawned_bootstrap_task = FuturesUnordered::new();
@ -161,6 +211,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
owner: vec![
solana_sdk::stake::program::ID.to_string(),
solana_sdk::vote::program::ID.to_string(),
solana_sdk::sysvar::stake_history::ID.to_string(),
// solana_sdk::system_program::ID.to_string(),
],
filters: vec![],
@ -208,7 +259,8 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
tokio::spawn(rpc_handle.stopped());
//Init bootstrap process
let bootstrap_data = BootstrapData {
let mut bootstrap_data = BootstrapData {
done: false,
current_epoch: current_epoch_state.current_epoch.epoch,
next_epoch_start_slot: current_epoch_state.next_epoch_start_slot,
sleep_time: 1,
@ -247,15 +299,21 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}
});
}
crate::rpc::Requests::BootstrapAccounts(tx) => {
log::info!("RPC start save_stakes");
crate::rpc::Requests::GetStakestore(tx) => {
let current_stakes = stakestore.get_cloned_stake_map();
if let Err(err) = tx.send((current_stakes, current_epoch_state.current_slot.confirmed_slot)){
println!("Channel error during sending bacl request status error:{err:?}");
println!("Channel error during sending back request status error:{err:?}");
}
log::info!("RPC bootstrap account send");
log::info!("RPC GetStakestore account send");
},
_ => crate::rpc::server_rpc_request(req, &current_epoch_state),
crate::rpc::Requests::GetVotestore(tx) => {
let current_votes = votestore.get_cloned_vote_map();
if let Err(err) = tx.send((current_votes, current_epoch_state.current_slot.confirmed_slot)){
println!("Channel error during sending back request status error:{err:?}");
}
log::info!("RPC GetVotestore account send");
},
_ => crate::rpc::server_rpc_request(req, &current_epoch_state, &leader_schedule_data),
}
},
@ -268,7 +326,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//exec bootstrap task
Some(Ok(event)) = spawned_bootstrap_task.next() => {
crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, &bootstrap_data);
crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, &mut bootstrap_data);
}
//Manage RPC call result execution
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
@ -283,7 +341,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
match new_schedule {
Ok((new_schedule, new_stakes)) => {
leader_schedule_data.next = Some(LeaderScheduleData {
schedule: new_schedule,
schedule: Arc::new(new_schedule),
vote_stakes: new_stakes,
epoch: epoch.epoch,
});
@ -327,7 +385,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
//log::trace!("Geyser receive new account");
match account.owner {
solana_sdk::stake::program::ID => {
log::info!("Geyser Notif stake account:{}", account);
log::info!("Geyser notif stake account:{}", account);
if let Err(err) = stakestore.notify_stake_change(
account,
current_epoch_state.current_epoch_end_slot(),
@ -344,6 +402,9 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
continue;
}
}
solana_sdk::sysvar::stake_history::ID => {
log::info!("Geyser notif History account:{}", account);
}
solana_sdk::system_program::ID => {
log::info!("system_program account:{}",account.pubkey);
}
@ -367,13 +428,15 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
// );
let schedule_event = current_epoch_state.process_new_slot(&slot);
if let Some(init_event) = schedule_event {
crate::leader_schedule::run_leader_schedule_events(
init_event,
&mut spawned_leader_schedule_task,
&mut stakestore,
&mut votestore,
);
if bootstrap_data.done {
if let Some(init_event) = schedule_event {
crate::leader_schedule::run_leader_schedule_events(
init_event,
&mut spawned_leader_schedule_task,
&mut stakestore,
&mut votestore,
);
}
}
}
@ -394,10 +457,11 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
let source_bytes: [u8; 64] = notif_tx.signature[..solana_sdk::signature::SIGNATURE_BYTES]
.try_into()
.unwrap();
log::info!("New stake Tx sign:{} at block slot:{:?} current_slot:{}"
log::info!("New stake Tx sign:{} at block slot:{:?} current_slot:{} accounts:{:?}"
, solana_sdk::signature::Signature::from(source_bytes).to_string()
, block.slot
, current_epoch_state.current_slot.confirmed_slot
, instruction.accounts
);
let program_index = instruction.program_id_index;
crate::stakestore::process_stake_tx_message(
@ -430,23 +494,24 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}
}
None => {
log::warn!("The geyser stream close try to reconnect and resynchronize.");
log::error!("The geyser stream close try to reconnect and resynchronize.");
break;
//TODO call same initial code.
let new_confirmed_stream = client
.subscribe_once(
slots.clone(),
accounts.clone(), //accounts
Default::default(), //tx
Default::default(), //entry
blocks.clone(), //full block
Default::default(), //block meta
Some(CommitmentLevel::Confirmed),
vec![],
)
.await?;
// let new_confirmed_stream = client
// .subscribe_once(
// slots.clone(),
// accounts.clone(), //accounts
// Default::default(), //tx
// Default::default(), //entry
// blocks.clone(), //full block
// Default::default(), //block meta
// Some(CommitmentLevel::Confirmed),
// vec![],
// )
// .await?;
confirmed_stream = new_confirmed_stream;
log::info!("reconnection done");
// confirmed_stream = new_confirmed_stream;
// log::info!("reconnection done");
//TODO resynchronize.
}
@ -455,7 +520,7 @@ async fn run_loop<F: Interceptor>(mut client: GeyserGrpcClient<F>) -> anyhow::Re
}
}
//Ok(())
Ok(())
}
#[derive(Debug)]
@ -481,6 +546,7 @@ impl AccountPretty {
}
crate::stakestore::read_stake_from_account_data(self.data.as_slice())
}
fn read_vote(&self) -> anyhow::Result<VoteState> {
if self.data.is_empty() {
log::warn!("Vote account with empty data. Can't read vote.");
@ -488,6 +554,16 @@ impl AccountPretty {
}
Ok(VoteState::deserialize(&self.data)?)
}
fn read_stake_history(&self) -> Option<StakeHistory> {
// if self.data.is_empty() {
// log::warn!("Stake history account with empty data. Can't read StakeHistory.");
// bail!("Error: read StakeHistory account with empty data");
// }
solana_sdk::account::from_account::<StakeHistory, _>(&program_account(&self.data))
// Ok(StakeHistory::deserialize(&self.data)?)
}
}
impl std::fmt::Display for AccountPretty {
@ -531,3 +607,13 @@ fn read_account(
txn_signature: bs58::encode(inner_account.txn_signature.unwrap_or_default()).into_string(),
})
}
fn program_account(program_data: &[u8]) -> AccountSharedData {
AccountSharedData::from(Account {
lamports: Rent::default().minimum_balance(program_data.len()).min(1),
data: program_data.to_vec(),
owner: solana_sdk::bpf_loader::id(),
executable: true,
rent_epoch: 0,
})
}

View File

@ -1,6 +1,9 @@
use crate::epoch::CurrentEpochSlotState;
use crate::leader_schedule::CalculatedSchedule;
use crate::stakestore::StakeMap;
use crate::stakestore::StoredStake;
use crate::votestore::{StoredVote, VoteMap};
use crate::LeaderScheduleData;
use crate::Slot;
use jsonrpsee::core::Error as JsonRpcError;
use jsonrpsee::proc_macros::rpc;
@ -10,6 +13,7 @@ use solana_client::rpc_config::RpcContextConfig;
use solana_client::rpc_response::RpcBlockhash;
use solana_client::rpc_response::RpcVoteAccountStatus;
use solana_sdk::commitment_config::CommitmentConfig;
use std::sync::Arc;
//use solana_rpc_client_api::response::Response as RpcResponse;
use solana_sdk::epoch_info::EpochInfo;
use std::collections::HashMap;
@ -74,12 +78,6 @@ pub trait ConsensusRpc {
#[method(name = "getEpochInfo")]
async fn get_epoch_info(&self) -> Result<EpochInfo>;
#[method(name = "getLeaderSchedule")]
async fn get_leader_schedule(
&self,
slot: Option<u64>,
) -> Result<Option<HashMap<String, Vec<usize>>>>;
#[method(name = "getVoteAccounts")]
async fn get_vote_accounts(&self) -> Result<RpcVoteAccountStatus>;
}
@ -112,13 +110,6 @@ impl ConsensusRpcServer for RPCServer {
Ok(rx.await?)
}
async fn get_leader_schedule(
&self,
slot: Option<u64>,
) -> Result<Option<HashMap<String, Vec<usize>>>> {
todo!()
}
async fn get_vote_accounts(&self) -> Result<RpcVoteAccountStatus> {
todo!()
}
@ -126,15 +117,24 @@ impl ConsensusRpcServer for RPCServer {
pub enum Requests {
SaveStakes,
BootstrapAccounts(tokio::sync::oneshot::Sender<(StakeMap, Slot)>),
GetStakestore(tokio::sync::oneshot::Sender<(StakeMap, Slot)>),
GetVotestore(tokio::sync::oneshot::Sender<(VoteMap, Slot)>),
EpochInfo(tokio::sync::oneshot::Sender<EpochInfo>),
Slot(
tokio::sync::oneshot::Sender<Option<Slot>>,
Option<RpcContextConfig>,
),
LeaderSchedule(
tokio::sync::oneshot::Sender<Option<Arc<HashMap<String, Vec<usize>>>>>,
Option<u64>,
),
}
pub fn server_rpc_request(request: Requests, current_epoch_state: &CurrentEpochSlotState) {
pub fn server_rpc_request(
request: Requests,
current_epoch_state: &CurrentEpochSlotState,
leader_schedules: &CalculatedSchedule,
) {
match request {
crate::rpc::Requests::EpochInfo(tx) => {
if let Err(err) = tx.send(current_epoch_state.current_epoch.clone()) {
@ -152,6 +152,28 @@ pub fn server_rpc_request(request: Requests, current_epoch_state: &CurrentEpochS
log::warn!("Channel error during sending back request status error:{err:?}");
}
}
crate::rpc::Requests::LeaderSchedule(tx, slot) => {
let slot = slot.unwrap_or_else(|| current_epoch_state.current_slot.confirmed_slot);
let epoch = crate::epoch::get_epoch_for_slot(slot, current_epoch_state);
log::info!(
"Requests::LeaderSchedule slot:{slot} epoch:{epoch} current epoch:{}",
current_epoch_state.current_epoch.epoch
);
//currently only return schedule for current of next epoch.
let get_schedule_fn = |schedule: &LeaderScheduleData| {
(schedule.epoch == epoch).then(|| schedule.schedule.clone()) //Arc clone.
};
let schedule = leader_schedules
.current
.as_ref()
.and_then(get_schedule_fn)
.or_else(|| leader_schedules.next.as_ref().and_then(get_schedule_fn));
if let Err(err) = tx.send(schedule) {
log::warn!("Channel error during sending back request status error:{err:?}");
}
}
_ => unreachable!(),
}
}
@ -165,7 +187,30 @@ pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHan
.await?;
let mut module = RpcModule::new(request_tx);
//register start Batch Tx send entry point
//Public RPC method
//use this way to process the RPC call to avoid to clone the content of the schedule.
//in the trait version, we have to return the full data that are serialized in json after.
module.register_async_method("getLeaderSchedule", |params, request_tx| async move {
log::info!("RPC getLeaderSchedule");
let slot: Option<u64> = params.one().unwrap_or(None);
let (tx, rx) = oneshot::channel();
if let Err(err) = request_tx.send(Requests::LeaderSchedule(tx, slot)).await {
return serde_json::Value::String(format!("error during query processing:{err}",));
}
rx.await
.map_err(|err| format!("error during bootstrap query processing:{err}"))
.and_then(|schedule| {
serde_json::to_value(&schedule).map_err(|err| {
format!(
"error during json serialisation:{}",
JsonRpcError::ParseError(err)
)
})
})
.unwrap_or_else(|err| serde_json::Value::String(err.to_string()))
})?;
//Test RPC method
module.register_async_method("save_stakes", |_params, request_tx| async move {
log::trace!("RPC save_stakes");
request_tx
@ -177,7 +222,7 @@ pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHan
module.register_async_method("bootstrap_accounts", |_params, request_tx| async move {
log::trace!("RPC bootstrap_accounts");
let (tx, rx) = oneshot::channel();
if let Err(err) = request_tx.send(Requests::BootstrapAccounts(tx)).await {
if let Err(err) = request_tx.send(Requests::GetStakestore(tx)).await {
return serde_json::Value::String(format!("error during query processing:{err}",));
}
rx.await
@ -198,6 +243,30 @@ pub(crate) async fn run_server(request_tx: Sender<Requests>) -> Result<ServerHan
})
.unwrap_or_else(|err| serde_json::Value::String(err.to_string()))
})?;
module.register_async_method("stake_accounts", |_params, request_tx| async move {
log::trace!("RPC bootstrap_accounts");
let (tx, rx) = oneshot::channel();
if let Err(err) = request_tx.send(Requests::GetVotestore(tx)).await {
return serde_json::Value::String(format!("error during query processing:{err}",));
}
rx.await
.map_err(|err| format!("error during bootstrap query processing:{err}"))
.and_then(|(accounts, slot)| {
println!("RPC end request status");
//replace pubkey with String. Json only allow distionary key with string.
let ret: HashMap<String, StoredVote> = accounts
.into_iter()
.map(|(pk, acc)| (pk.to_string(), acc))
.collect();
serde_json::to_value((slot, ret)).map_err(|err| {
format!(
"error during json serialisation:{}",
JsonRpcError::ParseError(err)
)
})
})
.unwrap_or_else(|err| serde_json::Value::String(err.to_string()))
})?;
let server_handle = server.start(module);
Ok(server_handle)
}

View File

@ -5,10 +5,12 @@ use borsh::BorshDeserialize;
use serde::{Deserialize, Serialize};
use serde_json::json;
use solana_sdk::account::Account;
use solana_sdk::account::AccountSharedData;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::instruction::StakeInstruction;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState;
use solana_sdk::stake_history::StakeHistory;
use std::collections::HashMap;
use tokio::sync::mpsc::Sender;
use yellowstone_grpc_proto::solana::storage::confirmed_block::CompiledInstruction;
@ -50,10 +52,11 @@ fn stake_map_notify_stake(map: &mut StakeMap, stake: StoredStake, current_epoch:
if strstake.last_update_slot <= stake.last_update_slot {
if stake.is_removed(current_epoch) {
log::info!(
"stake_map_notify_stake Stake store insert stake: {} stake:{stake:?}",
"stake_map_notify_stake Stake store remove stake: {} stake:{stake:?} current_epoch:{current_epoch}",
stake.pubkey
);
map.remove(&stake.pubkey);
//TODO activate when remove algo is validated.
//map.remove(&stake.pubkey);
} else {
log::info!("stake_map_notify_stake Stake store updated stake: {} old_stake:{strstake:?} stake:{stake:?}", stake.pubkey);
*strstake = stake;
@ -124,6 +127,8 @@ impl StoredStake {
self.stake.activation_epoch == crate::leader_schedule::MAX_EPOCH_VALUE
|| self.stake.deactivation_epoch >= current_epoch
}
fn add_history(&mut self) {}
}
#[derive(Debug, Default)]
pub struct StakeStore {
@ -275,11 +280,14 @@ impl StakeStore {
pub fn merge_program_account_in_strake_map(
stake_map: &mut StakeMap,
pa_list: Vec<(Pubkey, Account)>,
stakes_list: Vec<(Pubkey, Account)>,
stakehistory_list: Vec<(Pubkey, Account)>,
last_update_slot: Slot,
current_epoch: u64,
) {
pa_list
let mut stake_history_map: HashMap<Pubkey, Account> = stakehistory_list.into_iter().collect();
stakes_list
.into_iter()
.filter_map(
|(pk, account)| match read_stake_from_account_data(&account.data) {
@ -298,8 +306,24 @@ pub fn merge_program_account_in_strake_map(
last_update_slot,
write_version: 0,
};
if let Some(history) = stake_history_map.remove(&stake.pubkey) {
log::info!(
"merge_program_account_in_strake_map found stake history for account:{}",
stake.pubkey
);
match read_historystake_from_account(history) {
Some(history) => (),
None => (),
}
}
stake_map_notify_stake(stake_map, stake, current_epoch);
});
log::info!(
"merge_program_account_in_strake_map history account not processed:{}",
stake_history_map.len()
);
}
pub fn read_stake_from_account_data(mut data: &[u8]) -> anyhow::Result<Option<Delegation>> {
@ -315,6 +339,10 @@ pub fn read_stake_from_account_data(mut data: &[u8]) -> anyhow::Result<Option<De
}
}
pub fn read_historystake_from_account(account: Account) -> Option<StakeHistory> {
solana_sdk::account::from_account::<StakeHistory, _>(&AccountSharedData::from(account))
}
pub async fn start_stake_verification_loop(
rpc_url: String,
) -> Sender<(String, Pubkey, Option<StoredStake>)> {

View File

@ -1,6 +1,7 @@
use crate::AccountPretty;
use crate::Slot;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::vote::state::VoteState;
@ -38,7 +39,7 @@ pub fn merge_votestore(votestore: &mut VoteStore, vote_map: VoteMap) -> anyhow::
Ok(())
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StoredVote {
pub pubkey: Pubkey,
pub vote_data: VoteState,
@ -62,6 +63,10 @@ impl VoteStore {
}
}
pub fn get_cloned_vote_map(&self) -> VoteMap {
self.votes.clone()
}
//return the contained stake map to do an external update.
// During extract period (between extract and merge) added stake a stored to be processed later.
//if the store is already extracted return an error.