first impl for RPC get_vote_accounts

This commit is contained in:
musitdev 2023-10-25 19:26:59 +02:00
parent 7fba107165
commit b3317667a6
11 changed files with 752 additions and 282 deletions

View File

@ -1,7 +1,37 @@
use crate::stores::block_information_store::BlockInformation;
use crate::stores::data_cache::DataCache;
use solana_rpc_client_api::config::RpcGetVoteAccountsConfig;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::ParsePubkeyError;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Clone)]
pub struct GetVoteAccountsConfig {
pub vote_pubkey: Option<Pubkey>,
pub commitment: Option<CommitmentConfig>,
pub keep_unstaked_delinquents: Option<bool>,
pub delinquent_slot_distance: Option<u64>,
}
impl TryFrom<RpcGetVoteAccountsConfig> for GetVoteAccountsConfig {
type Error = ParsePubkeyError;
fn try_from(config: RpcGetVoteAccountsConfig) -> Result<Self, Self::Error> {
let vote_pubkey = config
.vote_pubkey
.as_ref()
.map(|pk| Pubkey::from_str(pk))
.transpose()?;
Ok(GetVoteAccountsConfig {
vote_pubkey,
commitment: config.commitment,
keep_unstaked_delinquents: config.keep_unstaked_delinquents,
delinquent_slot_distance: config.delinquent_slot_distance,
})
}
}
#[derive(Clone, Default)]
pub struct CalculatedSchedule {
@ -31,7 +61,7 @@ impl CalculatedSchedule {
let get_schedule = |schedule_data: Option<&LeaderScheduleData>| {
schedule_data.and_then(|current| {
(current.epoch == epoch.epoch).then_some(current.schedule.clone())
(current.epoch == epoch.epoch).then_some(current.schedule_by_node.clone())
})
};
get_schedule(self.current.as_ref()).or_else(|| get_schedule(self.next.as_ref()))
@ -40,6 +70,7 @@ impl CalculatedSchedule {
#[derive(Clone)]
pub struct LeaderScheduleData {
pub schedule: HashMap<String, Vec<usize>>,
pub schedule_by_node: HashMap<String, Vec<usize>>,
pub schedule_by_slot: Vec<Pubkey>,
pub epoch: u64,
}

View File

@ -55,6 +55,7 @@ Method calls:
##### Cluster info Domain
- [getclusternodes](https://docs.solana.com/api/http#getclusternodes) not in geyser plugin can be get from gossip. Try to update gyser first.
##### Validator Domain
- [getslot](https://docs.solana.com/api/http#getslot) Need top add 2 new commitment level for first shred seen and half confirm (1/3 of the stake has voted on the block)
- [getBlockHeight](https://docs.solana.com/api/http#getblockheight)

View File

@ -3,6 +3,8 @@ use crate::{
jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink,
rpc::LiteRpcServer,
};
use solana_rpc_client_api::config::RpcGetVoteAccountsConfig;
use solana_rpc_client_api::response::RpcVoteAccountStatus;
use solana_sdk::epoch_info::EpochInfo;
use std::collections::HashMap;
@ -485,4 +487,20 @@ impl LiteRpcServer for LiteBridge {
.await;
Ok(schedule)
}
async fn get_slot_leaders(
&self,
start_slot: u64,
limit: u64,
) -> crate::rpc::Result<Vec<Pubkey>> {
todo!()
}
async fn get_vote_accounts(
&self,
config: Option<RpcGetVoteAccountsConfig>,
) -> crate::rpc::Result<RpcVoteAccountStatus> {
let config: GetVoteAccountsConfig =
GetVoteAccountsConfig::try_from(config.unwrap_or_default())?;
todo!();
}
}

View File

@ -1,6 +1,7 @@
use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig};
use jsonrpsee::core::SubscriptionResult;
use jsonrpsee::proc_macros::rpc;
use solana_rpc_client_api::config::RpcGetVoteAccountsConfig;
use solana_rpc_client_api::config::{
RpcBlockConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper,
RpcContextConfig, RpcEncodingConfigWrapper, RpcEpochConfig, RpcGetVoteAccountsConfig,
@ -13,9 +14,9 @@ use solana_rpc_client_api::response::{
RpcContactInfo, RpcLeaderSchedule, RpcPerfSample, RpcPrioritizationFee, RpcVersionInfo,
RpcVoteAccountStatus,
};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::slot_history::Slot;
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use std::collections::HashMap;
@ -238,4 +239,17 @@ pub trait LiteRpc {
slot: Option<u64>,
config: Option<RpcLeaderScheduleConfig>,
) -> crate::rpc::Result<Option<HashMap<String, Vec<usize>>>>;
#[method(name = "getSlotLeaders")]
async fn get_slot_leaders(
&self,
start_slot: u64,
limit: u64,
) -> crate::rpc::Result<Vec<Pubkey>>;
#[method(name = "getVoteAccounts")]
async fn get_vote_accounts(
&self,
config: Option<RpcGetVoteAccountsConfig>,
) -> crate::rpc::Result<RpcVoteAccountStatus>;
}

View File

@ -1,9 +1,13 @@
use crate::epoch::ScheduleEpochData;
use crate::leader_schedule::LeaderScheduleGeneratedData;
use crate::stake::StakeMap;
use crate::stake::StakeStore;
use crate::utils::{Takable, TakeResult};
use crate::vote::EpochVoteStakes;
use crate::vote::VoteMap;
use crate::vote::VoteStore;
use anyhow::bail;
use futures::future::join_all;
use futures_util::stream::FuturesUnordered;
use solana_client::client_error::ClientError;
use solana_client::rpc_client::RpcClient;
@ -71,17 +75,22 @@ pub fn run_bootstrap_events(
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<BootstrapEvent>>,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> anyhow::Result<Option<bool>> {
let result = process_bootstrap_event(event, stakestore, votestore);
slots_in_epoch: u64,
) -> anyhow::Result<Option<anyhow::Result<CalculatedSchedule>>> {
let result = process_bootstrap_event(event, stakestore, votestore, slots_in_epoch);
match result {
BootsrapProcessResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh);
Ok(None)
}
BootsrapProcessResult::Event(event) => {
run_bootstrap_events(event, bootstrap_tasks, stakestore, votestore)
}
BootsrapProcessResult::End => Ok(Some(true)),
BootsrapProcessResult::Event(event) => run_bootstrap_events(
event,
bootstrap_tasks,
stakestore,
votestore,
slots_in_epoch,
),
BootsrapProcessResult::End(leader_schedule_result) => Ok(Some(leader_schedule_result)),
BootsrapProcessResult::Error(err) => bail!(err),
}
}
@ -105,7 +114,13 @@ pub enum BootstrapEvent {
Account,
String,
),
AccountsMerged(StakeMap, Option<StakeHistory>, VoteMap, String),
AccountsMerged(
StakeMap,
Option<StakeHistory>,
VoteMap,
String,
anyhow::Result<CalculatedSchedule>,
),
Exit,
}
@ -114,13 +129,14 @@ enum BootsrapProcessResult {
TaskHandle(JoinHandle<BootstrapEvent>),
Event(BootstrapEvent),
Error(String),
End,
End(anyhow::Result<CalculatedSchedule>),
}
fn process_bootstrap_event(
event: BootstrapEvent,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
slots_in_epoch: u64,
) -> BootsrapProcessResult {
match event {
BootstrapEvent::InitBootstrap {
@ -148,24 +164,30 @@ fn process_bootstrap_event(
}
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url) => {
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
match (
StakeStore::take_stakestore(stakestore),
VoteStore::take_votestore(votestore),
) {
(Ok((stake_map, _)), Ok(vote_map)) => {
match (&mut stakestore.stakes, &mut votestore.votes).take() {
TakeResult::Map(((stake_map, _), vote_map)) => {
BootsrapProcessResult::Event(BootstrapEvent::StoreExtracted(
stake_map, vote_map, stakes, votes, history, rpc_url,
))
}
_ => {
let jh = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url)
TakeResult::Taken(stake_notify) => {
let notif_jh = tokio::spawn({
async move {
let notifs = stake_notify
.iter()
.map(|n| n.notified())
.collect::<Vec<tokio::sync::futures::Notified>>();
join_all(notifs).await;
BootstrapEvent::BootstrapAccountsFetched(
stakes, votes, history, rpc_url,
)
}
});
BootsrapProcessResult::TaskHandle(jh)
BootsrapProcessResult::TaskHandle(notif_jh)
}
}
}
BootstrapEvent::StoreExtracted(
mut stake_map,
mut vote_map,
@ -198,18 +220,33 @@ fn process_bootstrap_event(
0, //with RPC no way to know the slot of the account update. Set to 0.
);
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map, rpc_url)
let leader_schedule_result = bootstrap_current_leader_schedule(slots_in_epoch);
BootstrapEvent::AccountsMerged(
stake_map,
stake_history,
vote_map,
rpc_url,
leader_schedule_result,
)
}
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::AccountsMerged(stake_map, stake_history, vote_map, rpc_url) => {
BootstrapEvent::AccountsMerged(
stake_map,
stake_history,
vote_map,
rpc_url,
leader_schedule_result,
) => {
log::info!("BootstrapEvent::AccountsMerged RECV");
match (
StakeStore::merge_stakestore(stakestore, stake_map, stake_history),
VoteStore::merge_votestore(votestore, vote_map),
stakestore.stakes.merge((stake_map, stake_history)),
votestore.votes.merge(vote_map),
) {
(Ok(()), Ok(())) => BootsrapProcessResult::End,
(Ok(()), Ok(())) => BootsrapProcessResult::End(leader_schedule_result),
_ => {
//TODO remove this error using type state
log::warn!("BootstrapEvent::AccountsMerged merge stake or vote fail, non extracted stake/vote map err, restart bootstrap");
@ -273,33 +310,41 @@ pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError>
res_stake
}
// pub struct BootstrapScheduleResult {
// schedule: CalculatedSchedule,
// vote_stakes: Vec<EpochVoteStakes>,
// }
pub fn bootstrap_current_leader_schedule(
slots_in_epoch: u64,
) -> anyhow::Result<CalculatedSchedule> {
let (current_epoch, current_stakes) =
let (current_epoch, current_epoch_stakes) =
crate::utils::read_schedule_vote_stakes(CURRENT_EPOCH_VOTE_STAKES_FILE)?;
let (next_epoch, next_stakes) =
let (next_epoch, next_epoch_stakes) =
crate::utils::read_schedule_vote_stakes(NEXT_EPOCH_VOTE_STAKES_FILE)?;
//calcualte leader schedule for all vote stakes.
let current_schedule = crate::leader_schedule::calculate_leader_schedule(
&current_stakes,
&current_epoch_stakes,
current_epoch,
slots_in_epoch,
);
let next_schedule =
crate::leader_schedule::calculate_leader_schedule(&next_stakes, next_epoch, slots_in_epoch);
let next_schedule = crate::leader_schedule::calculate_leader_schedule(
&next_epoch_stakes,
next_epoch,
slots_in_epoch,
);
Ok(CalculatedSchedule {
current: Some(LeaderScheduleData {
schedule: current_schedule,
//TODO use epoch stake for get_vote_accounts
schedule_by_node: LeaderScheduleGeneratedData::get_schedule_by_nodes(&current_schedule),
schedule_by_slot: current_schedule.get_slot_leaders().to_vec(),
epoch: current_epoch,
}),
next: Some(LeaderScheduleData {
schedule: next_schedule,
//TODO use epoch stake for get_vote_accounts
// vote_stakes: next_stakes.stake_vote_map,
schedule_by_node: LeaderScheduleGeneratedData::get_schedule_by_nodes(&next_schedule),
schedule_by_slot: next_schedule.get_slot_leaders().to_vec(),
epoch: next_epoch,
}),
})

View File

@ -1,6 +1,8 @@
use crate::stake::{StakeMap, StakeStore};
use crate::utils::{Takable, TakeResult};
use crate::vote::StoredVote;
use crate::vote::{VoteMap, VoteStore};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
@ -15,11 +17,24 @@ use tokio::task::JoinHandle;
#[derive(Debug)]
pub struct LeaderScheduleGeneratedData {
pub schedule: HashMap<String, Vec<usize>>,
pub vote_stakes: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
pub schedule: LeaderSchedule,
pub epoch_vote_stakes: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
pub epoch: u64,
}
impl LeaderScheduleGeneratedData {
pub fn get_schedule_by_nodes(schedule: &LeaderSchedule) -> HashMap<String, Vec<usize>> {
schedule
.get_slot_leaders()
.iter()
.enumerate()
.map(|(i, pk)| (pk.to_string(), i))
.into_group_map()
.into_iter()
.collect()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EpochStake {
epoch: u64,
@ -92,11 +107,8 @@ fn process_leadershedule_event(
) -> LeaderScheduleResult {
match event {
LeaderScheduleEvent::Init(new_epoch, slots_in_epoch, new_rate_activation_epoch) => {
match (
StakeStore::take_stakestore(stakestore),
VoteStore::take_votestore(votestore),
) {
(Ok((stake_map, mut stake_history)), Ok(vote_map)) => {
match (&mut stakestore.stakes, &mut votestore.votes).take() {
TakeResult::Map(((stake_map, mut stake_history), vote_map)) => {
log::info!("LeaderScheduleEvent::CalculateScedule");
//do the calculus in a blocking task.
let jh = tokio::task::spawn_blocking({
@ -142,12 +154,13 @@ fn process_leadershedule_event(
}
log::info!("End calculate leader schedule");
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
LeaderScheduleGeneratedData {
schedule: leader_schedule,
vote_stakes: epoch_vote_stakes,
epoch_vote_stakes,
epoch: next_epoch,
},
(new_epoch, slots_in_epoch, new_rate_activation_epoch),
@ -157,13 +170,22 @@ fn process_leadershedule_event(
});
LeaderScheduleResult::TaskHandle(jh)
}
_ => {
log::error!("Create leadershedule init event error during extract store");
LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
new_epoch,
slots_in_epoch,
new_rate_activation_epoch,
))
TakeResult::Taken(stake_notify) => {
let notif_jh = tokio::spawn({
async move {
let notifs = stake_notify
.iter()
.map(|n| n.notified())
.collect::<Vec<tokio::sync::futures::Notified>>();
join_all(notifs).await;
LeaderScheduleEvent::Init(
new_epoch,
slots_in_epoch,
new_rate_activation_epoch,
)
}
});
LeaderScheduleResult::TaskHandle(notif_jh)
}
}
}
@ -176,8 +198,8 @@ fn process_leadershedule_event(
) => {
log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV");
match (
StakeStore::merge_stakestore(stakestore, stake_map, stake_history),
VoteStore::merge_votestore(votestore, vote_map),
stakestore.stakes.merge((stake_map, stake_history)),
votestore.votes.merge(vote_map),
) {
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule_data),
_ => {
@ -263,7 +285,7 @@ pub fn calculate_leader_schedule(
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64,
slots_in_epoch: u64,
) -> HashMap<String, Vec<usize>> {
) -> LeaderSchedule {
let stakes_map: HashMap<Pubkey, u64> = stake_vote_map
.iter()
.filter_map(|(_, (stake, vote_account))| {
@ -281,16 +303,7 @@ pub fn calculate_leader_schedule(
sort_stakes(&mut stakes);
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
let schedule = LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS);
let slot_schedule = schedule
.get_slot_leaders()
.iter()
.enumerate()
.map(|(i, pk)| (pk.to_string(), i))
.into_group_map()
.into_iter()
.collect();
slot_schedule
schedule
}
// Cribbed from leader_schedule_utils

View File

@ -1,15 +1,19 @@
use crate::account::AccountPretty;
use crate::bootstrap::BootstrapEvent;
use crate::leader_schedule::LeaderScheduleGeneratedData;
use crate::utils::{Takable, TakeResult};
use futures::Stream;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig;
use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData;
use solana_lite_rpc_core::types::SlotStream;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::response::RpcVoteAccountStatus;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::CommitmentLevel;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
@ -34,6 +38,10 @@ type Slot = u64;
pub async fn start_stakes_and_votes_loop(
mut data_cache: DataCache,
mut slot_notification: SlotStream,
mut vote_account_rpc_request: Receiver<(
GetVoteAccountsConfig,
tokio::sync::oneshot::Sender<RpcVoteAccountStatus>,
)>,
rpc_client: Arc<RpcClient>,
grpc_url: String,
) -> anyhow::Result<tokio::task::JoinHandle<()>> {
@ -51,20 +59,11 @@ pub async fn start_stakes_and_votes_loop(
let mut current_schedule_epoch =
crate::bootstrap::bootstrap_scheduleepoch_data(&data_cache).await;
match crate::bootstrap::bootstrap_current_leader_schedule(
current_schedule_epoch.slots_in_epoch,
) {
Ok(current_schedule_data) => {
data_cache.leader_schedule = Arc::new(current_schedule_data)
}
Err(err) => {
log::warn!("Error during current leader schedule bootstrap from files:{err}")
}
}
//future execution collection.
let mut spawned_leader_schedule_task = FuturesUnordered::new();
let mut spawned_bootstrap_task = FuturesUnordered::new();
let mut rpc_notify_task = FuturesUnordered::new();
let mut rpc_exec_task = FuturesUnordered::new();
let jh = tokio::spawn(async move {
BootstrapEvent::InitBootstrap {
sleep_time: 1,
@ -74,6 +73,7 @@ pub async fn start_stakes_and_votes_loop(
spawned_bootstrap_task.push(jh);
let mut bootstrap_done = false;
let mut pending_rpc_request = vec![];
loop {
tokio::select! {
@ -93,6 +93,84 @@ pub async fn start_stakes_and_votes_loop(
}
}
}
Some((config, return_channel)) = vote_account_rpc_request.recv() => {
pending_rpc_request.push(return_channel);
let current_slot = crate::utils::get_current_confirmed_slot(&data_cache).await;
let vote_accounts = votestore.vote_stakes_for_epoch(0); //TODO define epoch storage.
match votestore.votes.take() {
TakeResult::Map(votes) => {
let jh = tokio::task::spawn_blocking({
move || {
let rpc_vote_accounts = crate::vote::get_rpc_vote_accounts_info(
current_slot,
&votes,
&vote_accounts.as_ref().unwrap().vote_stakes, //TODO put in take.
config,
);
(votes, vote_accounts, rpc_vote_accounts)
}
});
rpc_exec_task.push(jh);
}
TakeResult::Taken(mut stake_notify) => {
let notif_jh = tokio::spawn({
async move {
stake_notify.pop().unwrap().notified().await;
(current_slot, vote_accounts, config)
}
});
rpc_notify_task.push(notif_jh);
}
}
}
//manage rpc waiting request notification.
Some(Ok((votes, vote_accounts, rpc_vote_accounts))) = rpc_exec_task.next() => {
if let Err(err) = votestore.votes.merge(votes) {
log::info!("Error during RPC get vote account merge:{err}");
}
//avoid clone on the first request
//TODO change the logic use take less one.
if pending_rpc_request.len() == 1 {
if let Err(_) = pending_rpc_request.pop().unwrap().send(rpc_vote_accounts.clone()) {
log::error!("Vote accounts RPC channel send closed.");
}
} else {
for return_channel in pending_rpc_request.drain(..) {
if let Err(_) = return_channel.send(rpc_vote_accounts.clone()) {
log::error!("Vote accounts RPC channel send closed.");
}
}
}
}
//manage rpc waiting request notification.
Some(Ok((current_slot, vote_accounts, config))) = rpc_notify_task.next() => {
match votestore.votes.take() {
TakeResult::Map(votes) => {
let jh = tokio::task::spawn_blocking({
move || {
let rpc_vote_accounts = crate::vote::get_rpc_vote_accounts_info(
current_slot,
&votes,
&vote_accounts.as_ref().unwrap().vote_stakes, //TODO put in take.
config,
);
(votes, vote_accounts, rpc_vote_accounts)
}
});
rpc_exec_task.push(jh);
}
TakeResult::Taken(mut stake_notify) => {
let notif_jh = tokio::spawn({
async move {
stake_notify.pop().unwrap().notified().await;
(current_slot, vote_accounts, config)
}
});
rpc_notify_task.push(notif_jh);
}
}
}
//manage geyser account notification
//Geyser delete account notification patch must be installed on the validator.
//see https://github.com/solana-labs/solana/pull/33292
@ -129,7 +207,7 @@ pub async fn start_stakes_and_votes_loop(
//log::info!("Geyser notif VOTE account:{}", account);
let account_pubkey = account.pubkey;
//process vote accout notification
if let Err(err) = votestore.add_vote(account, current_schedule_epoch.last_slot_in_epoch) {
if let Err(err) = votestore.notify_vote_change(account, current_schedule_epoch.last_slot_in_epoch) {
log::warn!("Can't add new stake from account data err:{} account:{}", err, account_pubkey);
continue;
}
@ -163,8 +241,22 @@ pub async fn start_stakes_and_votes_loop(
}
//manage bootstrap event
Some(Ok(event)) = spawned_bootstrap_task.next() => {
match crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore) {
Ok(Some(boot_res))=> bootstrap_done = boot_res,
match crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, current_schedule_epoch.slots_in_epoch) {
Ok(Some(boot_res))=> {
match boot_res {
Ok(current_schedule_data) => {
//let data_schedule = Arc::make_mut(&mut data_cache.leader_schedule);
data_cache.leader_schedule = Arc::new(current_schedule_data);
bootstrap_done = true;
}
Err(err) => {
log::warn!("Error during current leader schedule bootstrap from files:{err}")
}
}
},
Ok(None) => (),
Err(err) => log::error!("Stake / Vote Account bootstrap fail because '{err}'"),
}
@ -183,10 +275,11 @@ pub async fn start_stakes_and_votes_loop(
data_schedule.current = data_schedule.next.take();
match new_leader_schedule {
//TODO use vote_stakes for vote accounts RPC call.
Some(LeaderScheduleGeneratedData{schedule, vote_stakes, epoch}) => {
Some(schedule_data) => {
let new_schedule_data = LeaderScheduleData{
schedule,
epoch
schedule_by_node: LeaderScheduleGeneratedData::get_schedule_by_nodes(&schedule_data.schedule),
schedule_by_slot: schedule_data.schedule.get_slot_leaders().to_vec(),
epoch: schedule_data.epoch
};
data_schedule.next = Some(new_schedule_data);
}

View File

@ -1,5 +1,8 @@
use crate::utils::Takable;
use crate::utils::TakableContent;
use crate::utils::TakableMap;
use crate::utils::TakeResult;
use crate::utils::UpdateAction;
use crate::AccountPretty;
use crate::Slot;
use anyhow::bail;
@ -13,30 +16,30 @@ use std::collections::HashMap;
pub type StakeMap = HashMap<Pubkey, StoredStake>;
type StakeContent = (StakeMap, Option<StakeHistory>);
#[derive(Debug, Default)]
pub enum StakeAction {
Notify {
stake: StoredStake,
},
Remove(Pubkey, Slot),
// Merge {
// source_account: Pubkey,
// destination_account: Pubkey,
// update_slot: Slot,
// },
#[default]
None,
}
// #[derive(Debug, Default)]
// pub enum StakeAction {
// Notify {
// stake: StoredStake,
// },
// Remove(Pubkey, Slot),
// // Merge {
// // source_account: Pubkey,
// // destination_account: Pubkey,
// // update_slot: Slot,
// // },
// #[default]
// None,
// }
impl StakeAction {
fn get_update_slot(&self) -> u64 {
match self {
StakeAction::Notify { stake } => stake.last_update_slot,
StakeAction::Remove(_, slot) => *slot,
StakeAction::None => 0,
}
}
}
// impl StakeAction {
// fn get_update_slot(&self) -> u64 {
// match self {
// StakeAction::Notify { stake } => stake.last_update_slot,
// StakeAction::Remove(_, slot) => *slot,
// StakeAction::None => 0,
// }
// }
// }
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StoredStake {
@ -47,15 +50,15 @@ pub struct StoredStake {
pub write_version: u64,
}
impl TakableContent<StakeAction> for StakeContent {
fn add_value(&mut self, val: StakeAction) {
impl TakableContent<StoredStake> for StakeContent {
fn add_value(&mut self, val: UpdateAction<StoredStake>) {
StakeStore::process_stake_action(&mut self.0, val);
}
}
#[derive(Debug, Default)]
pub struct StakeStore {
stakes: TakableMap<StakeAction, StakeContent>,
pub stakes: TakableMap<StoredStake, StakeContent>,
}
impl StakeStore {
@ -76,9 +79,9 @@ impl StakeStore {
) -> anyhow::Result<()> {
//if lamport == 0 the account has been removed.
if account.lamports == 0 {
self.notify_stake_action(
StakeAction::Remove(account.pubkey, account.slot),
current_end_epoch_slot,
self.stakes.add_value(
UpdateAction::Remove(account.pubkey, account.slot),
account.slot <= current_end_epoch_slot,
);
} else {
let Ok(delegated_stake_opt) = account.read_stake() else {
@ -94,26 +97,36 @@ impl StakeStore {
write_version: account.write_version,
};
self.notify_stake_action(StakeAction::Notify { stake }, current_end_epoch_slot);
let action_update_slot = stake.last_update_slot;
self.stakes.add_value(
UpdateAction::Notify(action_update_slot, stake),
action_update_slot <= current_end_epoch_slot,
);
}
}
Ok(())
}
pub fn notify_stake_action(&mut self, action: StakeAction, current_end_epoch_slot: Slot) {
let action_update_slot = action.get_update_slot();
self.stakes
.add_value(action, action_update_slot <= current_end_epoch_slot);
}
//helper method to extract and merge stakes.
// pub fn take_stakestore(&mut self) -> TakeResult<StakeContent> {
// self.stakes.take()
// }
fn process_stake_action(stakes: &mut StakeMap, action: StakeAction) {
// pub fn merge_stakestore(
// &mut self,
// stake_map: StakeMap,
// stake_hisotry: Option<StakeHistory>,
// ) -> anyhow::Result<()> {
// self.stakes.merge((stake_map, stake_hisotry))
// }
fn process_stake_action(stakes: &mut StakeMap, action: UpdateAction<StoredStake>) {
match action {
StakeAction::Notify { stake } => {
UpdateAction::Notify(_, stake) => {
Self::notify_stake(stakes, stake);
}
StakeAction::Remove(account_pk, slot) => Self::remove_stake(stakes, &account_pk, slot),
StakeAction::None => (),
UpdateAction::Remove(account_pk, slot) => Self::remove_stake(stakes, &account_pk, slot),
}
}
fn notify_stake(map: &mut StakeMap, stake: StoredStake) {
@ -151,20 +164,20 @@ impl StakeStore {
}
}
//helper method to extract and merge stakes.
pub fn take_stakestore(
stakestore: &mut StakeStore,
) -> anyhow::Result<(StakeMap, Option<StakeHistory>)> {
crate::utils::take(&mut stakestore.stakes)
}
// //helper method to extract and merge stakes.
// pub fn take_stakestore(
// stakestore: &mut StakeStore,
// ) -> anyhow::Result<(StakeMap, Option<StakeHistory>)> {
// crate::utils::take(&mut stakestore.stakes)
// }
pub fn merge_stakestore(
stakestore: &mut StakeStore,
stake_map: StakeMap,
stake_history: Option<StakeHistory>,
) -> anyhow::Result<()> {
crate::utils::merge(&mut stakestore.stakes, (stake_map, stake_history))
}
// pub fn merge_stakestore(
// stakestore: &mut StakeStore,
// stake_map: StakeMap,
// stake_history: Option<StakeHistory>,
// ) -> anyhow::Result<()> {
// crate::utils::merge(&mut stakestore.stakes, (stake_map, stake_history))
// }
}
pub fn merge_program_account_in_strake_map(

View File

@ -1,4 +1,5 @@
use crate::vote::StoredVote;
use crate::Slot;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::stores::block_information_store::BlockInformation;
@ -12,6 +13,7 @@ use std::fs::File;
use std::io::Write;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Notify;
pub async fn get_current_confirmed_slot(data_cache: &DataCache) -> u64 {
let commitment = CommitmentConfig::confirmed();
@ -73,9 +75,120 @@ pub fn save_schedule_vote_stakes(
Ok(())
}
#[derive(Debug)]
pub enum UpdateAction<Account> {
Notify(Slot, Account),
Remove(Pubkey, Slot),
}
// impl<Account> UpdateAction<Account> {
// pub fn get_update_slot(&self) -> u64 {
// match self {
// UpdateAction::Notify(slot, _) | UpdateAction::Remove(_, slot) => *slot,
// }
// }
// }
pub enum TakeResult<C> {
//Vec because can wait on several collection to be merged
Taken(Vec<Arc<Notify>>),
Map(C),
}
impl<C1> TakeResult<C1> {
pub fn and_then<C2>(self, action: TakeResult<C2>) -> TakeResult<(C1, C2)> {
match (self, action) {
(TakeResult::Taken(mut notif1), TakeResult::Taken(mut notif2)) => {
notif1.append(&mut notif2);
TakeResult::Taken(notif1)
}
(TakeResult::Map(content1), TakeResult::Map(content2)) => {
TakeResult::Map((content1, content2))
}
_ => unreachable!("Bad take result association."), //TODO add mix result.
}
}
// pub fn get_content(self) -> Option<C1> {
// match self {
// TakeResult::Taken(_) => None,
// TakeResult::Map(content) => Some(content),
// }
// }
}
//Takable struct code
pub trait TakableContent<T>: Default {
fn add_value(&mut self, val: T);
fn add_value(&mut self, val: UpdateAction<T>);
}
//Takable struct code
pub trait Takable<C> {
fn take(self) -> TakeResult<C>;
fn merge(self, content: C) -> anyhow::Result<()>;
fn is_taken(&self) -> bool;
}
impl<'a, T, C: TakableContent<T>> Takable<C> for &'a mut TakableMap<T, C> {
fn take(self) -> TakeResult<C> {
match self.content.take() {
Some(content) => TakeResult::Map(content),
None => TakeResult::Taken(vec![Arc::clone(&self.notifier)]),
}
}
fn merge(mut self, mut content: C) -> anyhow::Result<()> {
if self.content.is_none() {
//apply stake added during extraction.
for val in self.updates.drain(..) {
content.add_value(val);
}
self.content = Some(content);
self.notifier.notify_waiters();
Ok(())
} else {
bail!("TakableMap with a existing content".to_string())
}
}
fn is_taken(&self) -> bool {
self.content.is_none()
}
}
impl<'a, T1, T2, C1: TakableContent<T1>, C2: TakableContent<T2>> Takable<(C1, C2)>
for (&'a mut TakableMap<T1, C1>, &'a mut TakableMap<T2, C2>)
{
fn take(self) -> TakeResult<(C1, C2)> {
let first = self.0;
let second = self.1;
match (first.is_taken(), second.is_taken()) {
(true, true) | (false, false) => first.take().and_then(second.take()),
(true, false) => {
match first.take() {
TakeResult::Taken(notif) => TakeResult::Taken(notif),
TakeResult::Map(_) => unreachable!(), //tested before.
}
}
(false, true) => {
match second.take() {
TakeResult::Taken(notif) => TakeResult::Taken(notif),
TakeResult::Map(_) => unreachable!(), //tested before.
}
}
}
}
fn merge(self, content: (C1, C2)) -> anyhow::Result<()> {
self.0
.merge(content.0)
.and_then(|_| self.1.merge(content.1))
}
fn is_taken(&self) -> bool {
self.0.is_taken() && self.1.is_taken()
}
}
///A struct that hold a collection call content that can be taken during some time and merged after.
@ -83,84 +196,60 @@ pub trait TakableContent<T>: Default {
///It allow to process struct content while allowing to still update it without lock.
#[derive(Default, Debug)]
pub struct TakableMap<T, C: TakableContent<T>> {
pub content: C,
pub updates: Vec<T>,
taken: bool,
pub content: Option<C>,
pub updates: Vec<UpdateAction<T>>,
notifier: Arc<Notify>,
}
impl<T: Default, C: TakableContent<T> + Default> TakableMap<T, C> {
pub fn new(content: C) -> Self {
TakableMap {
content,
content: Some(content),
updates: vec![],
taken: false,
notifier: Arc::new(Notify::new()),
}
}
//add a value to the content if not taken or put it in the update waiting list.
//Use force_in_update to force the insert in update waiting list.
pub fn add_value(&mut self, val: T, force_in_update: bool) {
pub fn add_value(&mut self, val: UpdateAction<T>, force_in_update: bool) {
//during extract push the new update or
//don't insert now account change that has been done in next epoch.
//put in update pool to be merged next epoch change.
match self.taken || force_in_update {
match self.content.is_none() || force_in_update {
true => self.updates.push(val),
false => self.content.add_value(val),
false => {
let content = self.content.as_mut().unwrap(); //unwrap tested
content.add_value(val);
}
}
}
pub fn take(self) -> (Self, C) {
let takenmap = TakableMap {
content: C::default(),
updates: self.updates,
taken: true,
};
(takenmap, self.content)
}
pub fn merge(self, content: C) -> Self {
let mut mergedstore = TakableMap {
content,
updates: vec![],
taken: false,
};
//apply stake added during extraction.
for val in self.updates {
mergedstore.content.add_value(val);
}
mergedstore
}
pub fn is_taken(&self) -> bool {
self.taken
}
}
pub fn take<T: Default, C: TakableContent<T> + Default>(
map: &mut TakableMap<T, C>,
) -> anyhow::Result<C> {
if map.is_taken() {
bail!("TakableMap already taken. Try later");
}
let new_store = std::mem::take(map);
let (new_store, content) = new_store.take();
*map = new_store;
Ok(content)
}
// pub fn take<T: Default, C: TakableContent<T> + Default>(
// map: &mut TakableMap<T, C>,
// ) -> anyhow::Result<C> {
// if map.is_taken() {
// bail!("TakableMap already taken. Try later");
// }
// let new_store = std::mem::take(map);
// let (new_store, content) = new_store.take();
// *map = new_store;
// Ok(content)
// }
pub fn merge<T: Default, C: TakableContent<T> + Default>(
map: &mut TakableMap<T, C>,
content: C,
) -> anyhow::Result<()> {
if !map.is_taken() {
bail!("TakableMap merge of non taken map. Try later");
}
let new_store = std::mem::take(map);
let new_store = new_store.merge(content);
*map = new_store;
Ok(())
}
// pub fn merge<T: Default, C: TakableContent<T> + Default>(
// map: &mut TakableMap<T, C>,
// content: C,
// ) -> anyhow::Result<()> {
// if !map.is_taken() {
// bail!("TakableMap merge of non taken map. Try later");
// }
// let new_store = std::mem::take(map);
// let new_store = new_store.merge(content);
// *map = new_store;
// Ok(())
// }
#[cfg(test)]
mod tests {
@ -169,37 +258,55 @@ mod tests {
#[test]
fn test_takable_struct() {
impl TakableContent<u64> for Vec<u64> {
fn add_value(&mut self, val: u64) {
self.push(val)
fn add_value(&mut self, val: UpdateAction<u64>) {
match val {
UpdateAction::Notify(account, _) => self.push(account),
UpdateAction::Remove(_, _) => (),
UpdateAction::None => (),
}
}
}
let content: Vec<u64> = vec![];
let mut takable = TakableMap::new(content);
takable.add_value(23, false);
assert_eq!(takable.content.len(), 1);
takable.add_value(UpdateAction::Notify(23, 0), false);
assert_eq!(takable.content.as_ref().unwrap().len(), 1);
takable.add_value(24, true);
assert_eq!(takable.content.len(), 1);
takable.add_value(UpdateAction::Notify(24, 0), true);
assert_eq!(takable.content.as_ref().unwrap().len(), 1);
assert_eq!(takable.updates.len(), 1);
let content = take(&mut takable).unwrap();
assert_eq!(content.len(), 1);
assert_eq!(takable.content.len(), 0);
let take_content = (&mut takable).take();
assert_take_content_map(&take_content, 1);
assert_eq!(takable.updates.len(), 1);
let err_content = take(&mut takable);
assert!(err_content.is_err());
assert_eq!(content.len(), 1);
assert_eq!(takable.content.len(), 0);
let take_content = (&mut takable).take();
assert_take_content_taken(&take_content);
assert!(takable.content.is_none());
assert_eq!(takable.updates.len(), 1);
takable.add_value(25, false);
assert_eq!(takable.content.len(), 0);
takable.add_value(UpdateAction::Notify(25, 0), false);
assert_eq!(takable.updates.len(), 2);
merge(&mut takable, content).unwrap();
assert_eq!(takable.content.len(), 3);
let content = match take_content {
TakeResult::Taken(_) => panic!("not a content"),
TakeResult::Map(content) => content,
};
takable.merge(content);
assert_eq!(takable.content.as_ref().unwrap().len(), 3);
assert_eq!(takable.updates.len(), 0);
let err = merge(&mut takable, vec![]);
assert!(err.is_err());
//merge(&mut takable, vec![]);
//assert!(err.is_err());
}
fn assert_take_content_map(take_content: &TakeResult<Vec<u64>>, len: usize) {
match take_content {
TakeResult::Taken(_) => assert!(false),
TakeResult::Map(content) => assert_eq!(content.len(), len),
}
}
fn assert_take_content_taken(take_content: &TakeResult<Vec<u64>>) {
match take_content {
TakeResult::Taken(_) => (),
TakeResult::Map(_) => assert!(false),
}
}
}

View File

@ -1,9 +1,14 @@
use crate::utils::TakableContent;
use crate::utils::TakableMap;
use crate::utils::UpdateAction;
use crate::AccountPretty;
use crate::Slot;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig;
use solana_rpc_client_api::request::MAX_RPC_VOTE_ACCOUNT_INFO_EPOCH_CREDITS_HISTORY;
use solana_rpc_client_api::response::RpcVoteAccountInfo;
use solana_rpc_client_api::response::RpcVoteAccountStatus;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::vote::state::VoteState;
@ -12,9 +17,15 @@ use std::sync::Arc;
pub type VoteMap = HashMap<Pubkey, Arc<StoredVote>>;
#[derive(Debug, Clone)]
pub struct EpochVoteStakes {
pub vote_stakes: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
pub epoch: u64,
}
impl TakableContent<StoredVote> for VoteMap {
fn add_value(&mut self, val: StoredVote) {
VoteStore::vote_map_insert_vote(self, val.pubkey, val);
fn add_value(&mut self, val: UpdateAction<StoredVote>) {
VoteStore::process_vote_action(self, val);
}
}
@ -26,29 +37,84 @@ pub struct StoredVote {
pub write_version: u64,
}
impl StoredVote {
pub fn convert_to_rpc_vote_account_info(
&self,
activated_stake: u64,
epoch_vote_account: bool,
) -> RpcVoteAccountInfo {
let last_vote = self
.vote_data
.votes
.iter()
.last()
.map(|vote| vote.slot())
.unwrap_or_default();
RpcVoteAccountInfo {
vote_pubkey: self.pubkey.to_string(),
node_pubkey: self.vote_data.node_pubkey.to_string(),
activated_stake,
commission: self.vote_data.commission,
epoch_vote_account,
epoch_credits: self.vote_data.epoch_credits.clone(),
last_vote,
root_slot: self.vote_data.root_slot.unwrap_or_default(),
}
}
}
#[derive(Default)]
pub struct VoteStore {
votes: TakableMap<StoredVote, VoteMap>,
pub votes: TakableMap<StoredVote, VoteMap>,
epoch_vote_stake_map: HashMap<u64, EpochVoteStakes>,
}
impl VoteStore {
pub fn new(capacity: usize) -> Self {
VoteStore {
votes: TakableMap::new(HashMap::with_capacity(capacity)),
epoch_vote_stake_map: HashMap::new(),
}
}
pub fn add_vote(
pub fn add_epoch_vote_stake(&mut self, stakes: EpochVoteStakes) {
self.epoch_vote_stake_map.insert(stakes.epoch, stakes);
}
pub fn vote_stakes_for_epoch(&self, epoch: u64) -> Option<EpochVoteStakes> {
self.epoch_vote_stake_map.get(&epoch).cloned()
}
pub fn notify_vote_change(
&mut self,
new_account: AccountPretty,
current_end_epoch_slot: Slot,
) -> anyhow::Result<()> {
if new_account.lamports == 0 {
self.remove_from_store(&new_account.pubkey, new_account.slot);
//self.remove_from_store(&new_account.pubkey, new_account.slot);
self.votes.add_value(
UpdateAction::Remove(new_account.pubkey, new_account.slot),
new_account.slot <= current_end_epoch_slot,
);
} else {
let Ok(vote_data) = new_account.read_vote() else {
let Ok(mut vote_data) = new_account.read_vote() else {
bail!("Can't read Vote from account data");
};
//remove unnecessary entry. See Solana code rpc::rpc::get_vote_accounts
let epoch_credits = vote_data.epoch_credits();
vote_data.epoch_credits =
if epoch_credits.len() > MAX_RPC_VOTE_ACCOUNT_INFO_EPOCH_CREDITS_HISTORY {
epoch_credits
.iter()
.skip(epoch_credits.len() - MAX_RPC_VOTE_ACCOUNT_INFO_EPOCH_CREDITS_HISTORY)
.cloned()
.collect()
} else {
epoch_credits.clone()
};
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
let new_voteacc = StoredVote {