Merge branch 'main' into fix/grpc-multiplex-logging

This commit is contained in:
galactus 2024-01-11 13:18:41 +01:00 committed by GitHub
commit 7f55e5b361
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 4843 additions and 790 deletions

View File

@ -46,6 +46,6 @@ jobs:
- name: Early Build
run: |
cargo build --locked --workspace --all-targets
- name: Run Tests
run: RUST_LOG=info cargo test

2513
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,6 @@
[workspace]
resolver = "2"
members = [
"core",
"services",
@ -7,6 +9,7 @@ members = [
"quic-forward-proxy-integration-test",
"cluster-endpoints",
"history",
"stake_vote",
"bench"
]
@ -28,12 +31,15 @@ solana-net-utils = "~1.16.3"
solana-pubsub-client = "~1.16.3"
solana-streamer = "~1.16.3"
solana-account-decoder = "~1.16.3"
solana-ledger = "~1.16.3"
solana-program = "~1.16.3"
itertools = "0.10.5"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
bincode = "1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
borsh = "0.10.3"
thiserror = "1.0.40"
futures = "0.3.28"
bytes = "1.4.0"
@ -58,6 +64,7 @@ solana-lite-rpc-services = {path = "services", version="0.2.3"}
solana-lite-rpc-core = {path = "core", version="0.2.3"}
solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.3"}
solana-lite-rpc-history = {path = "history", version="0.2.3"}
solana-lite-rpc-stakevote = {path = "stake_vote", version="0.2.3"}
async-trait = "0.1.68"
yellowstone-grpc-client = "1.11.0"

View File

@ -20,7 +20,6 @@ solana tpu ([details](quic-forward-proxy/README.md)).
### Confirmation strategies
1) Subscribe to new blocks using websockets (deprecated)
(depricated)
2) Polling blocks over RPC.(Current)
3) Subscribe blocks over gRPC.
(Current)

View File

@ -76,7 +76,7 @@ export async function main() {
}
main().then(x => {
console.log('finished sucessfully')
console.log('finished successfully')
}).catch(e => {
console.log('caught an error : ' + e)
})

View File

@ -89,7 +89,7 @@ const skip_confirmations = get_postional_arg(5, false) === "true";
}
}
console.log("sucesses : " + successes)
console.log("successes : " + successes)
console.log("failures : " + failures)
//console.log("time taken to send : " + time_taken_to_send)
}

View File

@ -10,7 +10,7 @@ license = "AGPL"
[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
#geyser-grpc-connector = { tag = "v0.5.0+yellowstone.1.11+solana.1.16.17", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = "0.7.1+yellowstone.1.11"
geyser-grpc-connector = "0.7.2+yellowstone.1.11"
solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }

View File

@ -0,0 +1,85 @@
use anyhow::{bail, Error};
use async_trait::async_trait;
use solana_lite_rpc_core::structures::epoch::EpochCache;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::{
structures::leader_data::LeaderData, traits::leaders_fetcher_interface::LeaderFetcherInterface,
};
use std::sync::Arc;
use tokio::sync::RwLock;
// Stores leaders for slots from older to newer in leader schedule
// regularly removed old leaders and adds new ones
pub struct GrpcLeaderGetter {
epoch_data: EpochCache,
leader_schedule: Arc<RwLock<CalculatedSchedule>>,
}
impl GrpcLeaderGetter {
pub fn new(leader_schedule: Arc<RwLock<CalculatedSchedule>>, epoch_data: EpochCache) -> Self {
Self {
leader_schedule,
epoch_data,
}
}
}
#[async_trait]
impl LeaderFetcherInterface for GrpcLeaderGetter {
async fn get_slot_leaders(
&self,
from: solana_sdk::slot_history::Slot,
to: solana_sdk::slot_history::Slot,
) -> anyhow::Result<Vec<LeaderData>> {
//get epoch of from/to slot to see if they're in the current stored epoch.
let from_epoch = self.epoch_data.get_epoch_at_slot(from).epoch;
let to_epoch = self.epoch_data.get_epoch_at_slot(to).epoch;
let leader_schedule_data = self.leader_schedule.read().await;
let current_epoch = leader_schedule_data
.current
.as_ref()
.map(|e| e.epoch)
.unwrap_or(from_epoch);
let next_epoch = leader_schedule_data
.current
.as_ref()
.map(|e| e.epoch)
.unwrap_or(to_epoch)
+ 1;
if from > to {
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} from:{from} > to:{to}"
);
}
if from_epoch < current_epoch || from_epoch > next_epoch {
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} \
from_epoch:{from_epoch} < current_epoch:{current_epoch} \
|| from_epoch > next_epoch:{next_epoch}"
);
}
if to_epoch < current_epoch || to_epoch > next_epoch {
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} \
to_epoch:{to_epoch} < current_epoch:{current_epoch} \
|| to_epoch:{to_epoch} > next_epoch:{next_epoch}"
);
}
let limit = to - from;
let schedule = leader_schedule_data
.get_slot_leaders(from, limit, self.epoch_data.get_epoch_schedule())
.await
.map_err(Error::msg)?;
Ok(schedule
.into_iter()
.enumerate()
.map(|(index, pubkey)| LeaderData {
leader_slot: from + index as u64,
pubkey,
})
.collect())
}
}

View File

@ -1,13 +1,13 @@
use crate::grpc_stream_utils::channelize_stream;
use crate::grpc_subscription::map_block_update;
use futures::StreamExt;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
create_geyser_reconnecting_stream, GeyserFilter, GrpcSourceConfig,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use log::info;
use merge_streams::MergeStreams;
use log::{debug, info, trace, warn};
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use solana_lite_rpc_core::AnyhowJoinHandle;
@ -36,6 +36,21 @@ impl FromYellowstoneExtractor for BlockExtractor {
}
}
struct BlockMetaHashExtractor(CommitmentConfig);
impl FromYellowstoneExtractor for BlockMetaHashExtractor {
type Target = String;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(u64, String)> {
match update.update_oneof {
Some(UpdateOneof::BlockMeta(block_meta)) => {
Some((block_meta.slot, block_meta.blockhash))
}
_ => None,
}
}
}
/// connect to multiple grpc sources to consume confirmed blocks and block status update
pub fn create_grpc_multiplex_blocks_subscription(
grpc_sources: Vec<GrpcSourceConfig>,
) -> (Receiver<ProducedBlock>, AnyhowJoinHandle) {
@ -47,13 +62,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
info!("- connection to {}", grpc_source);
}
let _timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
};
let multiplex_stream_confirmed = {
let confirmed_blocks_stream = {
let commitment_config = CommitmentConfig::confirmed();
let mut streams = Vec::new();
@ -68,28 +77,77 @@ pub fn create_grpc_multiplex_blocks_subscription(
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
};
let multiplex_stream_finalized = {
let finalized_blockmeta_stream = {
let commitment_config = CommitmentConfig::finalized();
let mut streams = Vec::new();
for grpc_source in &grpc_sources {
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_and_txs(),
GeyserFilter(commitment_config).blocks_meta(),
);
streams.push(stream);
}
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
};
let merged_stream_confirmed_finalize =
(multiplex_stream_confirmed, multiplex_stream_finalized).merge();
// return value is the broadcast receiver
let (producedblock_sender, blocks_output_stream) =
tokio::sync::broadcast::channel::<ProducedBlock>(1000);
let (multiplexed_finalized_blocks, jh_channelizer) =
channelize_stream(merged_stream_confirmed_finalize);
let jh_block_emitter_task = {
tokio::task::spawn(async move {
// by blockhash
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream);
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);
(multiplexed_finalized_blocks, jh_channelizer)
let sender = producedblock_sender;
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
let mut last_finalized_slot: Slot = 0;
loop {
tokio::select! {
confirmed_block = confirmed_blocks_stream.next() => {
let confirmed_block = confirmed_block.expect("confirmed block from stream");
trace!("got confirmed block {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = sender.send(confirmed_block.clone()) {
warn!("Confirmed block channel has no receivers {e:?}");
continue
}
recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
},
meta_finalized = finalized_blockmeta_stream.next() => {
let blockhash = meta_finalized.expect("finalized block meta from stream");
if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) {
let finalized_block = cached_confirmed_block.to_finalized_block();
last_finalized_slot = finalized_block.slot;
debug!("got finalized blockmeta {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
if let Err(e) = sender.send(finalized_block) {
warn!("Finalized block channel has no receivers {e:?}");
continue;
}
} else {
debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
}
},
_ = cleanup_tick.tick() => {
let size_before = recent_confirmed_blocks.len();
recent_confirmed_blocks.retain(|_blockhash, block| {
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
});
let cnt_cleaned = size_before - recent_confirmed_blocks.len();
if cnt_cleaned > 0 {
debug!("cleaned {} confirmed blocks from cache", cnt_cleaned);
}
}
}
}
})
};
(blocks_output_stream, jh_block_emitter_task)
}
struct SlotExtractor {}

View File

@ -1,5 +1,6 @@
pub mod endpoint_stremers;
pub mod grpc_inspect;
pub mod grpc_leaders_getter;
pub mod grpc_multiplex;
pub mod grpc_stream_utils;
pub mod grpc_subscription;

View File

@ -126,7 +126,7 @@ pub fn poll_block(
//slot poller
let slot_poller = tokio::spawn(async move {
log::info!("block listner started");
log::info!("block listener started");
let current_slot = rpc_client
.get_slot()
.await

View File

@ -90,7 +90,7 @@ pub fn poll_slots(
processed_slot: current_slot,
estimated_processed_slot: estimated_slot,
})
.context("Connot send slot notification")?;
.context("Cannot send slot notification")?;
}
}
}

View File

@ -1,7 +1,7 @@
{
"rpc_addr": "http://0.0.0.0:8899",
"ws_addr": "ws://0.0.0.0:8900",
"lite_rpc_http_addr": "http://0.0.0.0:8890",
"lite_rpc_http_addr": "[::]:8890",
"lite_rpc_ws_addr": "[::]:8891",
"fanout_size": 18,
"identity_keypair": null,
@ -10,6 +10,7 @@
"transaction_retry_after_secs": 3,
"quic_proxy_addr": null,
"use_grpc": false,
"calculate_leader_schedule_form_geyser": false,
"grpc_addr": "http://127.0.0.0:10000",
"grpc_x_token": null,
"postgres": {

View File

@ -1,4 +1,7 @@
use crate::stores::block_information_store::BlockInformation;
use crate::stores::data_cache::DataCache;
use serde::Serialize;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::{uses_durable_nonce, Transaction, VersionedTransaction};
@ -32,3 +35,12 @@ impl SerializableTransaction for VersionedTransaction {
self.uses_durable_nonce()
}
}
pub async fn get_current_confirmed_slot(data_cache: &DataCache) -> u64 {
let commitment = CommitmentConfig::confirmed();
let BlockInformation { slot, .. } = data_cache
.block_information_store
.get_latest_block(commitment)
.await;
slot
}

View File

@ -1,9 +1,10 @@
use std::sync::{atomic::AtomicU64, Arc};
use crate::structures::leaderschedule::CalculatedSchedule;
use dashmap::DashMap;
use solana_sdk::hash::Hash;
use solana_sdk::slot_history::Slot;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::sync::{atomic::AtomicU64, Arc};
use tokio::sync::RwLock;
use crate::{
stores::{
@ -37,6 +38,7 @@ pub struct DataCache {
pub identity_stakes: IdentityStakes,
pub cluster_info: ClusterInfo,
pub epoch_data: EpochCache,
pub leader_schedule: Arc<RwLock<CalculatedSchedule>>,
}
impl DataCache {
@ -87,6 +89,7 @@ impl DataCache {
store: Arc::new(DashMap::new()),
},
epoch_data: EpochCache::new_for_tests(),
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
}
}
}

View File

@ -44,6 +44,10 @@ impl EpochCache {
}
}
pub fn get_epoch_schedule(&self) -> &EpochSchedule {
self.epoch_schedule.as_ref()
}
pub fn get_slots_in_epoch(&self, epoch: u64) -> u64 {
self.epoch_schedule.get_slots_in_epoch(epoch)
}
@ -56,7 +60,9 @@ impl EpochCache {
self.epoch_schedule.get_last_slot_in_epoch(epoch)
}
pub async fn bootstrap_epoch(rpc_client: &RpcClient) -> anyhow::Result<EpochCache> {
pub async fn bootstrap_epoch(
rpc_client: &RpcClient,
) -> anyhow::Result<(EpochCache, EpochInfo)> {
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;
@ -68,9 +74,14 @@ impl EpochCache {
bail!("Error during bootstrap epoch. SysvarAccountType::EpochSchedule can't be deserilized. Epoch can't be calculated.");
};
Ok(EpochCache {
epoch_schedule: Arc::new(epoch_schedule),
})
let epoch_info = rpc_client.get_epoch_info().await?;
Ok((
EpochCache {
epoch_schedule: Arc::new(epoch_schedule),
},
epoch_info,
))
}
}

View File

@ -0,0 +1,145 @@
use crate::stores::block_information_store::BlockInformation;
use crate::stores::data_cache::DataCache;
use solana_rpc_client_api::config::RpcGetVoteAccountsConfig;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::ParsePubkeyError;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::slot_history::Slot;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Clone, Default)]
pub struct GetVoteAccountsConfig {
pub vote_pubkey: Option<Pubkey>,
pub commitment: Option<CommitmentConfig>,
pub keep_unstaked_delinquents: Option<bool>,
pub delinquent_slot_distance: Option<u64>,
}
impl TryFrom<RpcGetVoteAccountsConfig> for GetVoteAccountsConfig {
type Error = ParsePubkeyError;
fn try_from(config: RpcGetVoteAccountsConfig) -> Result<Self, Self::Error> {
let vote_pubkey = config
.vote_pubkey
.as_ref()
.map(|pk| Pubkey::from_str(pk))
.transpose()?;
Ok(GetVoteAccountsConfig {
vote_pubkey,
commitment: config.commitment,
keep_unstaked_delinquents: config.keep_unstaked_delinquents,
delinquent_slot_distance: config.delinquent_slot_distance,
})
}
}
#[derive(Clone, Default, Debug)]
pub struct CalculatedSchedule {
pub current: Option<LeaderScheduleData>,
pub next: Option<LeaderScheduleData>,
}
impl CalculatedSchedule {
pub async fn get_leader_schedule_for_slot(
&self,
slot: Option<u64>,
commitment: Option<CommitmentConfig>,
data_cache: &DataCache,
) -> Option<HashMap<String, Vec<usize>>> {
log::debug!(
"get_leader_schedule_for_slot current:{:?} next:{:?} ",
self.current.clone().unwrap_or_default(),
self.next.clone().unwrap_or_default()
);
let commitment = commitment.unwrap_or_default();
let slot = match slot {
Some(slot) => slot,
None => {
let BlockInformation { slot, .. } = data_cache
.block_information_store
.get_latest_block(commitment)
.await;
slot
}
};
let epoch = data_cache.epoch_data.get_epoch_at_slot(slot);
let get_schedule = |schedule_data: Option<&LeaderScheduleData>| {
schedule_data.and_then(|current| {
(current.epoch == epoch.epoch).then_some(current.schedule_by_node.clone())
})
};
get_schedule(self.current.as_ref()).or_else(|| get_schedule(self.next.as_ref()))
}
pub async fn get_slot_leaders(
&self,
start_slot: Slot,
limit: u64,
epock_schedule: &EpochSchedule,
) -> Result<Vec<Pubkey>, String> {
log::debug!(
"get_slot_leaders rpc request received (start: {} limit: {})",
start_slot,
limit
);
pub const MAX_GET_SLOT_LEADERS: usize =
solana_rpc_client_api::request::MAX_GET_SLOT_LEADERS;
let mut limit = limit as usize;
if limit > MAX_GET_SLOT_LEADERS {
return Err(format!(
"Invalid Params: Invalid limit; max {MAX_GET_SLOT_LEADERS}"
));
}
let (epoch, slot_index) = epock_schedule.get_epoch_and_slot_index(start_slot);
let mut slot_leaders = Vec::with_capacity(limit);
let mut extend_slot_from_epoch = |leader_schedule: &[Pubkey], slot_index: usize| {
let take = limit.saturating_sub(slot_leaders.len());
slot_leaders.extend(leader_schedule.iter().skip(slot_index).take(take));
limit -= slot_leaders.len();
};
// log::info!(
// "get_slot_leaders epoch:{epoch} current:{:?} next:{:?} ",
// self.current.clone().unwrap_or_default(),
// self.next.clone().unwrap_or_default()
// );
//TODO manage more leader schedule data in storage.
//Here only search on current and next epoch
let res = [
(&self.current, slot_index as usize, epoch),
(&self.next, slot_index as usize, epoch),
(&self.next, 0, epoch + 1),
]
.into_iter()
.filter_map(|(epoch_data, slot_index, epoch)| {
epoch_data.as_ref().and_then(|epoch_data| {
(epoch_data.epoch == epoch).then_some((epoch_data, slot_index))
})
})
.map(|(epoch_data, slot_index)| {
extend_slot_from_epoch(&epoch_data.schedule_by_slot, slot_index);
})
.collect::<Vec<()>>();
match res.is_empty() {
true => Err(format!(
"Invalid Params: Invalid slot range: leader schedule for epoch {epoch} is unavailable"
)),
false => Ok(slot_leaders),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct LeaderScheduleData {
pub schedule_by_node: HashMap<String, Vec<usize>>,
pub schedule_by_slot: Vec<Pubkey>,
pub epoch: u64,
}

View File

@ -3,6 +3,7 @@
pub mod epoch;
pub mod identity_stakes;
pub mod leader_data;
pub mod leaderschedule;
pub mod notifications;
pub mod produced_block;
pub mod proxy_request_format;

View File

@ -165,4 +165,12 @@ impl ProducedBlock {
rewards,
}
}
/// moving commitment level to finalized
pub fn to_finalized_block(&self) -> Self {
ProducedBlock {
commitment_config: CommitmentConfig::finalized(),
..self.clone()
}
}
}

View File

@ -55,6 +55,7 @@ Method calls:
##### Cluster info Domain
- [getclusternodes](https://docs.solana.com/api/http#getclusternodes) not in geyser plugin can be get from gossip. Try to update gyser first.
##### Validator Domain
- [getslot](https://docs.solana.com/api/http#getslot) Need top add 2 new commitment level for first shred seen and half confirm (1/3 of the stake has voted on the block)
- [getBlockHeight](https://docs.solana.com/api/http#getblockheight)

View File

@ -46,6 +46,7 @@ solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }
solana-lite-rpc-history = { workspace = true }
solana-lite-rpc-stakevote = { workspace = true }
[dev-dependencies]
bench = { path = "../bench" }

View File

@ -3,7 +3,9 @@ use crate::{
jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink,
rpc::LiteRpcServer,
};
use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig;
use solana_sdk::epoch_info::EpochInfo;
use std::collections::HashMap;
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
@ -22,21 +24,22 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{
config::{
RpcBlockConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper,
RpcContextConfig, RpcEncodingConfigWrapper, RpcEpochConfig, RpcGetVoteAccountsConfig,
RpcProgramAccountsConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig,
RpcSignatureSubscribeConfig, RpcSignaturesForAddressConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
RpcContextConfig, RpcEncodingConfigWrapper, RpcGetVoteAccountsConfig,
RpcLeaderScheduleConfig, RpcProgramAccountsConfig, RpcRequestAirdropConfig,
RpcSignatureStatusConfig, RpcSignatureSubscribeConfig, RpcSignaturesForAddressConfig,
RpcTransactionLogsConfig, RpcTransactionLogsFilter,
},
response::{
Response as RpcResponse, RpcBlockhash, RpcConfirmedTransactionStatusWithSignature,
RpcContactInfo, RpcLeaderSchedule, RpcPerfSample, RpcPrioritizationFee, RpcResponseContext,
RpcVersionInfo, RpcVoteAccountStatus,
RpcContactInfo, RpcPerfSample, RpcPrioritizationFee, RpcResponseContext, RpcVersionInfo,
RpcVoteAccountStatus,
},
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use std::{str::FromStr, sync::Arc};
use tokio::net::ToSocketAddrs;
use tokio::sync::oneshot;
lazy_static::lazy_static! {
static ref RPC_SEND_TX: IntCounter =
@ -62,6 +65,12 @@ pub struct LiteBridge {
rpc_client: Arc<RpcClient>,
transaction_service: TransactionService,
history: History,
state_vote_sendder: Option<
tokio::sync::mpsc::Sender<(
GetVoteAccountsConfig,
tokio::sync::oneshot::Sender<RpcVoteAccountStatus>,
)>,
>,
}
impl LiteBridge {
@ -70,12 +79,19 @@ impl LiteBridge {
data_cache: DataCache,
transaction_service: TransactionService,
history: History,
state_vote_sendder: Option<
tokio::sync::mpsc::Sender<(
GetVoteAccountsConfig,
oneshot::Sender<RpcVoteAccountStatus>,
)>,
>,
) -> Self {
Self {
rpc_client,
data_cache,
transaction_service,
history,
state_vote_sendder,
}
}
@ -267,21 +283,6 @@ impl LiteRpcServer for LiteBridge {
Ok(epoch_info)
}
async fn get_leader_schedule(
&self,
_slot: Option<Slot>,
_config: Option<RpcEncodingConfigWrapper<RpcEpochConfig>>,
) -> crate::rpc::Result<Option<RpcLeaderSchedule>> {
todo!()
}
async fn get_vote_accounts(
&self,
_config: Option<RpcGetVoteAccountsConfig>,
) -> crate::rpc::Result<RpcVoteAccountStatus> {
todo!()
}
async fn get_recent_performance_samples(
&self,
_limit: Option<usize>,
@ -470,4 +471,61 @@ impl LiteRpcServer for LiteBridge {
async fn vote_subscribe(&self, _pending: PendingSubscriptionSink) -> SubscriptionResult {
todo!()
}
async fn get_leader_schedule(
&self,
slot: Option<u64>,
config: Option<RpcLeaderScheduleConfig>,
) -> crate::rpc::Result<Option<HashMap<String, Vec<usize>>>> {
//TODO verify leader identity.
let schedule = self
.data_cache
.leader_schedule
.read()
.await
.get_leader_schedule_for_slot(slot, config.and_then(|c| c.commitment), &self.data_cache)
.await;
Ok(schedule)
}
async fn get_slot_leaders(
&self,
start_slot: u64,
limit: u64,
) -> crate::rpc::Result<Vec<Pubkey>> {
let epock_schedule = self.data_cache.epoch_data.get_epoch_schedule();
self.data_cache
.leader_schedule
.read()
.await
.get_slot_leaders(start_slot, limit, epock_schedule)
.await
.map_err(|err| {
jsonrpsee::core::Error::Custom(format!("error during query processing:{err}"))
})
}
async fn get_vote_accounts(
&self,
config: Option<RpcGetVoteAccountsConfig>,
) -> crate::rpc::Result<RpcVoteAccountStatus> {
let config: GetVoteAccountsConfig =
GetVoteAccountsConfig::try_from(config.unwrap_or_default()).unwrap_or_default();
if let Some(state_vote_sendder) = &self.state_vote_sendder {
let (tx, rx) = oneshot::channel();
if let Err(err) = state_vote_sendder.send((config, tx)).await {
return Err(jsonrpsee::core::Error::Custom(format!(
"error during query processing:{err}",
)));
}
rx.await.map_err(|err| {
jsonrpsee::core::Error::Custom(format!("error during query processing:{err}"))
})
} else {
self.rpc_client
.get_vote_accounts()
.await
.map_err(|err| (jsonrpsee::core::Error::Custom(err.to_string())))
}
}
}

View File

@ -42,7 +42,8 @@ pub struct Config {
pub quic_proxy_addr: Option<String>,
#[serde(default)]
pub use_grpc: bool,
#[serde(default)]
pub calculate_leader_schedule_form_geyser: bool,
#[serde(default = "Config::default_grpc_addr")]
pub grpc_addr: String,
#[serde(default)]

View File

@ -13,8 +13,6 @@ pub mod service_spawner;
#[from_env]
pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";
#[from_env]
pub const DEFAULT_LITE_RPC_ADDR: &str = "http://0.0.0.0:8890";
#[from_env]
pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
#[from_env]
@ -35,7 +33,7 @@ pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus
TransactionConfirmationStatus::Finalized;
#[from_env]
pub const DEFAULT_GRPC_ADDR: &str = "http://127.0.0.0:10000";
pub const DEFAULT_GRPC_ADDR: &str = "http://localhost:10000";
#[from_env]
pub const GRPC_VERSION: &str = "1.16.1";

View File

@ -1,7 +1,6 @@
pub mod rpc_tester;
use std::time::Duration;
use crate::rpc_tester::RpcTester;
use anyhow::bail;
use dashmap::DashMap;
use lite_rpc::bridge::LiteBridge;
@ -9,11 +8,9 @@ use lite_rpc::cli::Config;
use lite_rpc::postgres_logger::PostgresLogger;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE;
use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig;
use crate::rpc_tester::RpcTester;
use log::info;
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::grpc_leaders_getter::GrpcLeaderGetter;
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{
GrpcConnectionTimeouts, GrpcSourceConfig,
@ -29,14 +26,17 @@ use solana_lite_rpc_core::stores::{
subscription_store::SubscriptionStore,
tx_store::TxStore,
};
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
produced_block::ProducedBlock,
};
use solana_lite_rpc_core::traits::leaders_fetcher_interface::LeaderFetcherInterface;
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore;
use solana_lite_rpc_history::history::History;
use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig;
use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
@ -49,7 +49,9 @@ use solana_sdk::signature::Keypair;
use solana_sdk::signer::Signer;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
async fn get_latest_block(
mut block_stream: BlockStream,
@ -86,6 +88,7 @@ pub async fn start_postgres(
pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
let grpc_sources = args.get_grpc_sources();
log::info!("grpc_sources:{grpc_sources:?}");
let Config {
lite_rpc_ws_addr,
lite_rpc_http_addr,
@ -97,6 +100,8 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
transaction_retry_after_secs,
quic_proxy_addr,
use_grpc,
calculate_leader_schedule_form_geyser,
grpc_addr,
..
} = args;
@ -128,11 +133,16 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
})
.collect(),
)?
// create_grpc_subscription(
// rpc_client.clone(),
// grpc_addr.clone(),
// GRPC_VERSION.to_string(),
// )?
} else {
info!("Creating RPC poll subscription...");
create_json_rpc_polling_subscription(rpc_client.clone())?
};
let EndpointStreaming {
blocks_notifier,
cluster_info_notifier,
@ -144,7 +154,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);
let epoch_data = EpochCache::bootstrap_epoch(&rpc_client).await?;
let (epoch_data, current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;
let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
@ -159,15 +169,16 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
store: Arc::new(DashMap::new()),
},
epoch_data,
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
};
let lata_cache_service = DataCachingService {
let data_cache_service = DataCachingService {
data_cache: data_cache.clone(),
clean_duration: Duration::from_secs(120),
};
// to avoid laggin we resubscribe to block notification
let data_caching_service = lata_cache_service.listen(
let data_caching_service = data_cache_service.listen(
blocks_notifier.resubscribe(),
slot_notifier.resubscribe(),
cluster_info_notifier,
@ -196,8 +207,49 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
prometheus_addr,
data_cache: data_cache.clone(),
};
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
//init grpc leader schedule and vote account is configured.
let (leader_schedule, rpc_stakes_send): (Arc<dyn LeaderFetcherInterface>, Option<_>) =
if use_grpc && calculate_leader_schedule_form_geyser {
//init leader schedule grpc process.
//1) get stored leader schedule and stakes (or via RPC if not present)
solana_lite_rpc_stakevote::bootstrat_literpc_leader_schedule(
rpc_client.url(),
&data_cache,
current_epoch_info.epoch,
)
.await;
//2) start stake vote and leader schedule.
let (rpc_stakes_send, rpc_stakes_recv) = mpsc::channel(1000);
let stake_vote_jh = solana_lite_rpc_stakevote::start_stakes_and_votes_loop(
data_cache.clone(),
slot_notifier.resubscribe(),
rpc_stakes_recv,
Arc::clone(&rpc_client),
grpc_addr,
)
.await?;
//
tokio::spawn(async move {
let err = stake_vote_jh.await;
log::error!("Vote and stake Services exit with error: {err:?}");
});
(
Arc::new(GrpcLeaderGetter::new(
Arc::clone(&data_cache.leader_schedule),
data_cache.epoch_data.clone(),
)),
Some(rpc_stakes_send),
)
} else {
(
Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128)),
None,
)
};
let tpu_service: TpuService = TpuService::new(
tpu_config,
validator_identity,
@ -217,6 +269,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
maximum_retries_per_tx,
slot_notifier.resubscribe(),
);
drop(slot_notifier);
let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });
@ -231,6 +284,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
data_cache.clone(),
transaction_service,
history,
rpc_stakes_send,
)
.start(lite_rpc_http_addr, lite_rpc_ws_addr),
);

View File

@ -1,24 +1,23 @@
use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig};
use jsonrpsee::core::SubscriptionResult;
use jsonrpsee::proc_macros::rpc;
use solana_rpc_client_api::config::{
RpcBlockConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper,
RpcContextConfig, RpcEncodingConfigWrapper, RpcEpochConfig, RpcGetVoteAccountsConfig,
RpcContextConfig, RpcEncodingConfigWrapper, RpcGetVoteAccountsConfig, RpcLeaderScheduleConfig,
RpcProgramAccountsConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig,
RpcSignatureSubscribeConfig, RpcSignaturesForAddressConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
};
use solana_rpc_client_api::response::{
Response as RpcResponse, RpcBlockhash, RpcConfirmedTransactionStatusWithSignature,
RpcContactInfo, RpcLeaderSchedule, RpcPerfSample, RpcPrioritizationFee, RpcVersionInfo,
RpcVoteAccountStatus,
RpcContactInfo, RpcPerfSample, RpcPrioritizationFee, RpcVersionInfo, RpcVoteAccountStatus,
};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::slot_history::Slot;
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig};
use std::collections::HashMap;
pub type Result<T> = std::result::Result<T, jsonrpsee::core::Error>;
@ -111,22 +110,6 @@ pub trait LiteRpc {
// block: u64,
// ) -> Result<RpcBlockCommitment<BlockCommitmentArray>>;
#[method(name = "getEpochInfo")]
async fn get_epoch_info(&self, config: Option<RpcContextConfig>) -> Result<EpochInfo>;
#[method(name = "getLeaderSchedule")]
async fn get_leader_schedule(
&self,
slot: Option<Slot>,
config: Option<RpcEncodingConfigWrapper<RpcEpochConfig>>,
) -> Result<Option<RpcLeaderSchedule>>;
#[method(name = "getVoteAccounts")]
async fn get_vote_accounts(
&self,
config: Option<RpcGetVoteAccountsConfig>,
) -> Result<RpcVoteAccountStatus>;
#[method(name = "getRecentPerformanceSamples")]
async fn get_recent_performance_samples(
&self,
@ -225,4 +208,30 @@ pub trait LiteRpc {
#[subscription(name = "voteSubscribe" => "voteNotification", unsubscribe="voteUnsubscribe", item=RpcVote)]
async fn vote_subscribe(&self) -> SubscriptionResult;
#[method(name = "getEpochInfo")]
async fn get_epoch_info(
&self,
config: Option<RpcContextConfig>,
) -> crate::rpc::Result<EpochInfo>;
#[method(name = "getLeaderSchedule")]
async fn get_leader_schedule(
&self,
slot: Option<u64>,
config: Option<RpcLeaderScheduleConfig>,
) -> crate::rpc::Result<Option<HashMap<String, Vec<usize>>>>;
#[method(name = "getSlotLeaders")]
async fn get_slot_leaders(
&self,
start_slot: u64,
limit: u64,
) -> crate::rpc::Result<Vec<Pubkey>>;
#[method(name = "getVoteAccounts")]
async fn get_vote_accounts(
&self,
config: Option<RpcGetVoteAccountsConfig>,
) -> crate::rpc::Result<RpcVoteAccountStatus>;
}

View File

@ -95,7 +95,7 @@ Implementation Details
* _proxy_ will maintain multiple quic connection per TPU according to the Solana validator quic policy
* _proxy_ will use lightweight quic streams to send the transactions
* inbound traffic (from Lite RPC)
* client-proxy-communcation is done via QUIC using a custom wire format
* client-proxy-communication is done via QUIC using a custom wire format
* _proxy_ supports only quic ATM but that could be extended to support other protocols
* _proxy_ should perform client authentication by TLS (see [issue](https://github.com/blockworks-foundation/lite-rpc/issues/167))
* _proxy_ uses a single queue (channel) for buffering the transactions from any inbound connection

View File

@ -29,7 +29,7 @@ enum ConnectionState {
}
pub struct AutoReconnect {
// endoint should be configures with keep-alive and idle timeout
// endpoint should be configures with keep-alive and idle timeout
endpoint: Endpoint,
current: RwLock<ConnectionState>,
pub target_address: SocketAddr,

38
stake_vote/Cargo.toml Normal file
View File

@ -0,0 +1,38 @@
[package]
name = "solana-lite-rpc-stakevote"
version = "0.2.3"
edition = "2021"
description = "History implementations used by solana lite rpc"
rust-version = "1.70.0"
repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = { workspace = true }
bincode = { workspace = true }
borsh = { workspace = true }
bs58 = { workspace = true }
log = { workspace = true }
itertools = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }
solana-sdk = { workspace = true }
solana-client = { workspace = true }
solana-ledger = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-rpc-client = { workspace = true }
solana-version = { workspace = true }
solana-account-decoder = { workspace = true }
solana-program = { workspace = true }
solana-lite-rpc-core = { workspace = true }
futures = { version = "0.3.28", default-features = false }
tokio = { version = "1.28.2", features = ["full"]}
futures-util = "0.3.28"

100
stake_vote/src/account.rs Normal file
View File

@ -0,0 +1,100 @@
use anyhow::bail;
use borsh::BorshDeserialize;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation;
use solana_sdk::stake::state::StakeState;
use solana_sdk::stake_history::StakeHistory;
use solana_sdk::vote::state::VoteState;
use yellowstone_grpc_proto::prelude::SubscribeUpdateAccount;
#[derive(Debug)]
#[allow(dead_code)]
pub struct AccountPretty {
pub is_startup: bool,
pub slot: u64,
pub pubkey: Pubkey,
pub lamports: u64,
pub owner: Pubkey,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
pub write_version: u64,
pub txn_signature: String,
}
impl AccountPretty {
pub fn new_from_geyser(
geyser_account: SubscribeUpdateAccount,
current_slot: u64,
) -> Option<AccountPretty> {
let Some(inner_account) = geyser_account.account else {
log::warn!("Receive a SubscribeUpdateAccount without account.");
return None;
};
if geyser_account.slot != current_slot {
log::trace!(
"Get geyser account on a different slot:{} of the current:{current_slot}",
geyser_account.slot
);
}
Some(AccountPretty {
is_startup: geyser_account.is_startup,
slot: geyser_account.slot,
pubkey: Pubkey::try_from(inner_account.pubkey).expect("valid pubkey"),
lamports: inner_account.lamports,
owner: Pubkey::try_from(inner_account.owner).expect("valid pubkey"),
executable: inner_account.executable,
rent_epoch: inner_account.rent_epoch,
data: inner_account.data,
write_version: inner_account.write_version,
txn_signature: bs58::encode(inner_account.txn_signature.unwrap_or_default())
.into_string(),
})
}
pub fn read_stake(&self) -> anyhow::Result<Option<Delegation>> {
read_stake_from_account_data(self.data.as_slice())
}
// pub fn read_stake_history(&self) -> Option<StakeHistory> {
// read_historystake_from_account(self.data.as_slice())
// }
pub fn read_vote(&self) -> anyhow::Result<VoteState> {
if self.data.is_empty() {
log::warn!("Vote account with empty data. Can't read vote.");
bail!("Error: read Vote account with empty data");
}
Ok(VoteState::deserialize(&self.data)?)
}
}
impl std::fmt::Display for AccountPretty {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} at slot:{} lpt:{}",
self.pubkey, self.slot, self.lamports
)
}
}
pub fn read_stake_from_account_data(mut data: &[u8]) -> anyhow::Result<Option<Delegation>> {
if data.is_empty() {
log::warn!("Stake account with empty data. Can't read stake.");
bail!("Error: read Stake account with empty data");
}
match BorshDeserialize::deserialize(&mut data)? {
StakeState::Stake(_, stake) => Ok(Some(stake.delegation)),
StakeState::Initialized(_) => Ok(None),
StakeState::Uninitialized => Ok(None),
StakeState::RewardsPool => Ok(None),
}
}
pub fn read_historystake_from_account(account_data: &[u8]) -> Option<StakeHistory> {
//solana_sdk::account::from_account::<StakeHistory, _>(&AccountSharedData::from(account))
bincode::deserialize(account_data).ok()
}

502
stake_vote/src/bootstrap.rs Normal file
View File

@ -0,0 +1,502 @@
use crate::epoch::ScheduleEpochData;
use crate::leader_schedule::LeaderScheduleGeneratedData;
use crate::stake::StakeMap;
use crate::stake::StakeStore;
use crate::utils::{Takable, TakeResult};
use crate::vote::EpochVoteStakes;
use crate::vote::EpochVoteStakesCache;
use crate::vote::VoteMap;
use crate::vote::VoteStore;
use anyhow::bail;
use futures::future::join_all;
use futures_util::stream::FuturesUnordered;
use solana_client::client_error::ClientError;
use solana_client::client_error::ClientErrorKind;
use solana_client::rpc_client::RpcClient;
use solana_client::rpc_response::RpcVoteAccountStatus;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData;
use solana_program::slot_history::Slot;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::collections::HashMap;
use std::time::Duration;
use tokio::task::JoinHandle;
//File where the Vote and stake use to calculate the leader schedule at epoch are stored.
//Use to bootstrap current and next epoch leader schedule.
//TODO to be removed with inter RPC bootstrap and snapshot read.
pub const CURRENT_EPOCH_VOTE_STAKES_FILE: &str = "current_vote_stakes.json";
pub const NEXT_EPOCH_VOTE_STAKES_FILE: &str = "next_vote_stakes.json";
pub async fn bootstrap_schedule_epoch_data(data_cache: &DataCache) -> ScheduleEpochData {
let new_rate_activation_epoch = solana_sdk::feature_set::FeatureSet::default()
.new_warmup_cooldown_rate_epoch(data_cache.epoch_data.get_epoch_schedule());
let bootstrap_epoch = crate::utils::get_current_epoch(data_cache).await;
ScheduleEpochData::new(
bootstrap_epoch.epoch,
bootstrap_epoch.slots_in_epoch,
data_cache
.epoch_data
.get_last_slot_in_epoch(bootstrap_epoch.epoch),
bootstrap_epoch.absolute_slot,
new_rate_activation_epoch,
)
}
// Return the current and next epoxh leader schedule and the current epoch stakes of vote accounts
// if the corresponding files exist.
pub fn bootstrap_leaderschedule_from_files(
current_epoch_of_loading: u64,
slots_in_epoch: u64,
) -> Option<(CalculatedSchedule, RpcVoteAccountStatus)> {
bootstrap_current_leader_schedule(slots_in_epoch, current_epoch_of_loading)
.map(|(leader_schedule, current_epoch_stakes, _)| {
let vote_acccounts = crate::vote::get_rpc_vote_account_info_from_current_epoch_stakes(
&current_epoch_stakes,
);
(leader_schedule, vote_acccounts)
})
.ok()
}
// Return the current or next epoch leader schedule using the RPC calls.
pub fn bootstrap_leaderschedule_from_rpc(
rpc_url: String,
epoch_schedule: &EpochSchedule,
) -> Result<CalculatedSchedule, ClientError> {
let current_epoch = get_rpc_epoch_info(rpc_url.clone())?;
let current_schedule_by_node =
get_rpc_leader_schedule(rpc_url.clone(), None)?.ok_or(ClientError {
request: None,
kind: ClientErrorKind::Custom("RPC return no leader schedule".to_string()),
})?;
//Calculate the slot leaders by from the node schedule because RPC call get_slot_leaders is limited to 5000 slots.
let current_schedule_by_slot =
crate::leader_schedule::calculate_slot_leaders_from_schedule(&current_schedule_by_node)
.map_err(|err| ClientError {
request: None,
kind: ClientErrorKind::Custom(format!(
"Leader schedule from RPC can't generate slot leaders because:{err}"
)),
})?;
//get next epoch rpc schedule
let next_epoch = current_epoch.epoch + 1;
let next_first_epoch_slot = epoch_schedule.get_first_slot_in_epoch(next_epoch);
let next_schedule_by_node =
get_rpc_leader_schedule(rpc_url.clone(), Some(next_first_epoch_slot))?.ok_or(
ClientError {
request: None,
kind: ClientErrorKind::Custom("RPC return no leader schedule".to_string()),
},
)?;
//Calculate the slot leaders by from the node schedule because RPC call get_slot_leaders is limited to 5000 slots.
let next_schedule_by_slot =
crate::leader_schedule::calculate_slot_leaders_from_schedule(&next_schedule_by_node)
.map_err(|err| ClientError {
request: None,
kind: ClientErrorKind::Custom(format!(
"Leader schedule from RPC can't generate slot leaders because:{err}"
)),
})?;
Ok(CalculatedSchedule {
current: Some(LeaderScheduleData {
schedule_by_node: current_schedule_by_node.clone(),
schedule_by_slot: current_schedule_by_slot.clone(),
epoch: current_epoch.epoch,
}),
next: Some(LeaderScheduleData {
schedule_by_node: next_schedule_by_node,
schedule_by_slot: next_schedule_by_slot,
epoch: current_epoch.epoch + 1,
}),
})
}
/*
Bootstrap state changes
InitBootstrap
|
|Fetch accounts|
| |
Error BootstrapAccountsFetched(account list)
| |
|Exit| |Extract stores|
| |
Error StoreExtracted(account list, stores)
| |
| Wait(1s)| |Merge accounts in store|
| | |
BootstrapAccountsFetched(account list) Error AccountsMerged(stores)
| |
|Log and skip| |Merges store|
|Account | | |
Error End
|
|never occurs restart|
|
InitBootstrap
*/
pub fn run_bootstrap_events(
event: BootstrapEvent,
bootstrap_tasks: &mut FuturesUnordered<JoinHandle<BootstrapEvent>>,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
slots_in_epoch: u64,
current_epoch_of_loading: u64,
) -> anyhow::Result<Option<anyhow::Result<(CalculatedSchedule, RpcVoteAccountStatus)>>> {
let result = process_bootstrap_event(
event,
stakestore,
votestore,
slots_in_epoch,
current_epoch_of_loading,
);
match result {
BootsrapProcessResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh);
Ok(None)
}
BootsrapProcessResult::Event(event) => run_bootstrap_events(
event,
bootstrap_tasks,
stakestore,
votestore,
slots_in_epoch,
current_epoch_of_loading,
),
BootsrapProcessResult::End(leader_schedule_result) => Ok(Some(leader_schedule_result)),
BootsrapProcessResult::Error(err) => bail!(err),
}
}
pub enum BootstrapEvent {
InitBootstrap {
sleep_time: u64,
rpc_url: String,
},
BootstrapAccountsFetched(
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Account,
String,
),
StoreExtracted(
StakeMap,
VoteMap,
EpochVoteStakesCache,
Vec<(Pubkey, Account)>,
Vec<(Pubkey, Account)>,
Account,
String,
),
AccountsMerged(
StakeMap,
VoteMap,
EpochVoteStakesCache,
String,
anyhow::Result<(CalculatedSchedule, RpcVoteAccountStatus)>,
),
Exit,
}
#[allow(clippy::large_enum_variant)] //214 byte large and only use during bootstrap.
enum BootsrapProcessResult {
TaskHandle(JoinHandle<BootstrapEvent>),
Event(BootstrapEvent),
Error(String),
End(anyhow::Result<(CalculatedSchedule, RpcVoteAccountStatus)>),
}
fn process_bootstrap_event(
event: BootstrapEvent,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
slots_in_epoch: u64,
current_epoch_of_loading: u64,
) -> BootsrapProcessResult {
match event {
BootstrapEvent::InitBootstrap {
sleep_time,
rpc_url,
} => {
let jh = tokio::task::spawn_blocking(move || {
if sleep_time > 0 {
std::thread::sleep(Duration::from_secs(sleep_time));
}
match bootstrap_accounts(rpc_url.clone()) {
Ok((stakes, votes, history)) => {
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url)
}
Err(err) => {
log::warn!(
"Bootstrap account error during fetching accounts err:{err}. Exit"
);
BootstrapEvent::Exit
}
}
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url) => {
match (&mut stakestore.stakes, &mut votestore.votes).take() {
TakeResult::Map((stake_map, (vote_map, epoch_cache))) => {
BootsrapProcessResult::Event(BootstrapEvent::StoreExtracted(
stake_map,
vote_map,
epoch_cache,
stakes,
votes,
history,
rpc_url,
))
}
TakeResult::Taken(stake_notify) => {
let notif_jh = tokio::spawn({
async move {
let notifs = stake_notify
.iter()
.map(|n| n.notified())
.collect::<Vec<tokio::sync::futures::Notified>>();
join_all(notifs).await;
BootstrapEvent::BootstrapAccountsFetched(
stakes, votes, history, rpc_url,
)
}
});
BootsrapProcessResult::TaskHandle(notif_jh)
}
}
}
BootstrapEvent::StoreExtracted(
mut stake_map,
mut vote_map,
mut epoch_cache,
stakes,
votes,
history,
rpc_url,
) => {
let stake_history = crate::account::read_historystake_from_account(&history.data);
if stake_history.is_none() {
return BootsrapProcessResult::Error(
"Bootstrap error, can't read stake history from account data.".to_string(),
);
}
//merge new PA with stake map and vote map in a specific task
let jh = tokio::task::spawn_blocking({
move || {
//update pa_list to set slot update to start epoq one.
crate::stake::merge_program_account_in_strake_map(
&mut stake_map,
stakes,
0, //with RPC no way to know the slot of the account update. Set to 0.
);
crate::vote::merge_program_account_in_vote_map(
&mut vote_map,
votes,
0, //with RPC no way to know the slot of the account update. Set to 0.
);
match bootstrap_current_leader_schedule(
current_epoch_of_loading,
slots_in_epoch,
) {
Ok((leader_schedule, current_epoch_stakes, next_epoch_stakes)) => {
let vote_acccounts =
crate::vote::get_rpc_vote_account_info_from_current_epoch_stakes(
&current_epoch_stakes,
);
epoch_cache.add_stakes_for_epoch(current_epoch_stakes);
epoch_cache.add_stakes_for_epoch(next_epoch_stakes);
BootstrapEvent::AccountsMerged(
stake_map,
vote_map,
epoch_cache,
rpc_url,
Ok((leader_schedule, vote_acccounts)),
)
}
Err(err) => BootstrapEvent::AccountsMerged(
stake_map,
vote_map,
epoch_cache,
rpc_url,
Err(err),
),
}
}
});
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::AccountsMerged(
stake_map,
vote_map,
epoch_cache,
rpc_url,
leader_schedule_result,
) => {
match (
stakestore.stakes.merge(stake_map),
votestore.votes.merge((vote_map, epoch_cache)),
) {
(Ok(()), Ok(())) => BootsrapProcessResult::End(leader_schedule_result),
_ => {
//TODO remove this error using type state
log::warn!("BootstrapEvent::AccountsMerged merge stake or vote fail, non extracted stake/vote map err, restart bootstrap");
BootsrapProcessResult::Event(BootstrapEvent::InitBootstrap {
sleep_time: 10,
rpc_url,
})
}
}
}
BootstrapEvent::Exit => panic!("Bootstrap account can't be done exit"),
}
}
#[allow(clippy::type_complexity)]
fn bootstrap_accounts(
rpc_url: String,
) -> Result<(Vec<(Pubkey, Account)>, Vec<(Pubkey, Account)>, Account), ClientError> {
get_stake_account(rpc_url)
.and_then(|(stakes, rpc_url)| {
get_vote_account(rpc_url).map(|(votes, rpc_url)| (stakes, votes, rpc_url))
})
.and_then(|(stakes, votes, rpc_url)| {
get_stakehistory_account(rpc_url).map(|history| (stakes, votes, history))
})
}
fn get_stake_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client
.get_program_accounts(&solana_sdk::stake::program::id())
.map(|stake| (stake, rpc_url))
}
fn get_vote_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client
.get_program_accounts(&solana_sdk::vote::program::id())
.map(|votes| (votes, rpc_url))
}
pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client.get_account(&solana_sdk::sysvar::stake_history::id())
}
fn get_rpc_epoch_info(rpc_url: String) -> Result<EpochInfo, ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client.get_epoch_info()
}
fn get_rpc_leader_schedule(
rpc_url: String,
slot: Option<Slot>,
) -> Result<Option<HashMap<String, Vec<usize>>>, ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client.get_leader_schedule(slot)
}
// pub struct BootstrapScheduleResult {
// schedule: CalculatedSchedule,
// vote_stakes: Vec<EpochVoteStakes>,
// }
pub fn bootstrap_current_leader_schedule(
current_epoch_of_loading: u64,
slots_in_epoch: u64,
) -> anyhow::Result<(CalculatedSchedule, EpochVoteStakes, EpochVoteStakes)> {
let (current_epoch, current_epoch_stakes) =
crate::utils::read_schedule_vote_stakes(CURRENT_EPOCH_VOTE_STAKES_FILE)?;
let (next_epoch, next_epoch_stakes) =
crate::utils::read_schedule_vote_stakes(NEXT_EPOCH_VOTE_STAKES_FILE)?;
//verify that the current loaded epoch correspond to the current epoch slot
if current_epoch_of_loading != current_epoch {
return Err(ClientError {
request: None,
kind: ClientErrorKind::Custom(
"Current epoch bootstrap file doesn't correspond to the validator current epoch."
.to_string(),
),
}
.into());
}
//calcualte leader schedule for all vote stakes.
let current_schedule = crate::leader_schedule::calculate_leader_schedule(
&current_epoch_stakes,
current_epoch,
slots_in_epoch,
);
let next_schedule = crate::leader_schedule::calculate_leader_schedule(
&next_epoch_stakes,
next_epoch,
slots_in_epoch,
);
Ok((
CalculatedSchedule {
current: Some(LeaderScheduleData {
schedule_by_node: LeaderScheduleGeneratedData::get_schedule_by_nodes(
&current_schedule,
),
schedule_by_slot: current_schedule.get_slot_leaders().to_vec(),
epoch: current_epoch,
}),
next: Some(LeaderScheduleData {
schedule_by_node: LeaderScheduleGeneratedData::get_schedule_by_nodes(
&next_schedule,
),
schedule_by_slot: next_schedule.get_slot_leaders().to_vec(),
epoch: next_epoch,
}),
},
EpochVoteStakes {
epoch: current_epoch,
vote_stakes: current_epoch_stakes,
},
EpochVoteStakes {
epoch: next_epoch,
vote_stakes: next_epoch_stakes,
},
))
}

111
stake_vote/src/epoch.rs Normal file
View File

@ -0,0 +1,111 @@
use crate::leader_schedule::LeaderScheduleEvent;
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_sdk::stake_history::StakeHistory;
//#[derive(Debug, Default, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct ScheduleEpochData {
pub current_epoch: u64,
pub slots_in_epoch: u64,
pub last_slot_in_epoch: u64,
pub current_confirmed_slot: u64,
pub new_rate_activation_epoch: Option<solana_sdk::clock::Epoch>,
//to start a new epoch and schedule, the new stake history
//Must be notified and the end epoch slot notfied.
//these field store each event.
//If they're defined an new epoch and leader schedule can append.
new_stake_history: Option<StakeHistory>,
next_epoch_change: Option<(u64, u64)>,
}
impl ScheduleEpochData {
pub fn new(
current_epoch: u64,
slots_in_epoch: u64,
last_slot_in_epoch: u64,
current_confirmed_slot: u64,
new_rate_activation_epoch: Option<solana_sdk::clock::Epoch>,
) -> Self {
ScheduleEpochData {
current_epoch,
slots_in_epoch,
last_slot_in_epoch,
current_confirmed_slot,
new_rate_activation_epoch,
new_stake_history: None,
next_epoch_change: None,
}
}
pub async fn process_new_confirmed_slot(
&mut self,
new_slot: u64,
data_cache: &DataCache,
) -> Option<LeaderScheduleEvent> {
if self.current_confirmed_slot < new_slot {
self.current_confirmed_slot = new_slot;
log::trace!("Receive slot slot: {new_slot:?}");
self.manage_change_epoch(data_cache).await
} else {
None
}
}
pub fn set_epoch_stake_history(
&mut self,
history: StakeHistory,
) -> Option<LeaderScheduleEvent> {
log::debug!("set_epoch_stake_history");
self.new_stake_history = Some(history);
self.verify_epoch_change()
}
async fn manage_change_epoch(&mut self, data_cache: &DataCache) -> Option<LeaderScheduleEvent> {
//execute leaderschedule calculus at the last slot of the current epoch.
//account change of the slot has been send at confirmed slot.
//first epoch slot send all stake change and during this send no slot is send.
//to avoid to delay too much the schedule, start the calculus at the end of the epoch.
//the first epoch slot arrive very late cause of the stake account notification from the validator.
if self.current_confirmed_slot >= self.last_slot_in_epoch {
log::debug!(
"manage_change_epoch at slot:{} last_slot_in_epoch:{}",
self.current_confirmed_slot,
self.last_slot_in_epoch
);
let next_epoch = data_cache
.epoch_data
.get_epoch_at_slot(self.last_slot_in_epoch + 1);
let last_slot_in_epoch = data_cache
.epoch_data
.get_last_slot_in_epoch(next_epoch.epoch);
//start leader schedule calculus
//at current epoch change the schedule is calculated for the next epoch.
self.next_epoch_change = Some((next_epoch.epoch, last_slot_in_epoch));
self.verify_epoch_change()
} else {
None
}
}
fn verify_epoch_change(&mut self) -> Option<LeaderScheduleEvent> {
if self.new_stake_history.is_some() && self.next_epoch_change.is_some() {
log::info!("Change epoch at slot:{}", self.current_confirmed_slot);
let (next_epoch, last_slot_in_epoch) = self.next_epoch_change.take().unwrap(); //unwrap tested before.
self.current_epoch = next_epoch;
self.last_slot_in_epoch = last_slot_in_epoch;
//start leader schedule calculus
//at current epoch change the schedule is calculated for the next epoch.
Some(crate::leader_schedule::LeaderScheduleEvent::Init(
self.current_epoch,
self.slots_in_epoch,
self.new_rate_activation_epoch,
self.new_stake_history.take().unwrap(), //unwrap tested before
))
} else {
None
}
}
}

View File

@ -0,0 +1,341 @@
use crate::stake::{StakeMap, StakeStore};
use crate::utils::{Takable, TakeResult};
use crate::vote::EpochVoteStakes;
use crate::vote::EpochVoteStakesCache;
use crate::vote::StoredVote;
use crate::vote::{VoteMap, VoteStore};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use solana_ledger::leader_schedule::LeaderSchedule;
use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData;
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake_history::StakeHistory;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::task::JoinHandle;
#[derive(Debug)]
pub struct LeaderScheduleGeneratedData {
pub schedule: LeaderSchedule,
pub rpc_data: LeaderScheduleData,
pub epoch: u64,
}
impl LeaderScheduleGeneratedData {
pub fn get_schedule_by_nodes(schedule: &LeaderSchedule) -> HashMap<String, Vec<usize>> {
schedule
.get_slot_leaders()
.iter()
.enumerate()
.map(|(i, pk)| (pk.to_string(), i))
.into_group_map()
.into_iter()
.collect()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct EpochStake {
epoch: u64,
stake_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
}
/*
Leader schedule calculus state diagram
InitLeaderschedule
|
|extract store stake and vote|
| |
Error CalculateScedule(stakes, votes)
| |
| Wait(1s)| |Calculate schedule|
| |
InitLeaderscedule MergeStore(stakes, votes, schedule)
| |
Error SaveSchedule(schedule)
| |
|never occurs restart (wait 1s)| |save schedule and verify (opt)|
|
InitLeaderscedule
*/
#[allow(clippy::large_enum_variant)] //256 byte large and only use during schedule calculus.
pub enum LeaderScheduleEvent {
Init(u64, u64, Option<solana_sdk::clock::Epoch>, StakeHistory),
MergeStoreAndSaveSchedule(
StakeMap,
VoteMap,
EpochVoteStakesCache,
LeaderScheduleGeneratedData,
(u64, u64, Option<solana_sdk::clock::Epoch>),
StakeHistory,
),
}
enum LeaderScheduleResult {
TaskHandle(JoinHandle<LeaderScheduleEvent>),
Event(LeaderScheduleEvent),
End(LeaderScheduleGeneratedData),
}
//Execute the leader schedule process.
pub fn run_leader_schedule_events(
event: LeaderScheduleEvent,
schedule_tasks: &mut FuturesUnordered<JoinHandle<LeaderScheduleEvent>>,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> Option<LeaderScheduleGeneratedData> {
let result = process_leadershedule_event(event, stakestore, votestore);
match result {
LeaderScheduleResult::TaskHandle(jh) => {
schedule_tasks.push(jh);
None
}
LeaderScheduleResult::Event(event) => {
run_leader_schedule_events(event, schedule_tasks, stakestore, votestore)
}
LeaderScheduleResult::End(schedule) => Some(schedule),
}
}
fn process_leadershedule_event(
// rpc_url: String,
event: LeaderScheduleEvent,
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
) -> LeaderScheduleResult {
match event {
LeaderScheduleEvent::Init(
new_epoch,
slots_in_epoch,
new_rate_activation_epoch,
stake_history,
) => {
match (&mut stakestore.stakes, &mut votestore.votes).take() {
TakeResult::Map((stake_map, (vote_map, mut epoch_cache))) => {
log::info!("Start calculate leader schedule");
//do the calculus in a blocking task.
let jh = tokio::task::spawn_blocking({
move || {
let epoch_vote_stakes = calculate_epoch_stakes(
&stake_map,
&vote_map,
new_epoch,
&stake_history,
new_rate_activation_epoch,
);
let next_epoch = new_epoch + 1;
let leader_schedule = calculate_leader_schedule(
&epoch_vote_stakes,
next_epoch,
slots_in_epoch,
);
if std::path::Path::new(crate::bootstrap::NEXT_EPOCH_VOTE_STAKES_FILE)
.exists()
{
if let Err(err) = std::fs::rename(
crate::bootstrap::NEXT_EPOCH_VOTE_STAKES_FILE,
crate::bootstrap::CURRENT_EPOCH_VOTE_STAKES_FILE,
) {
log::error!(
"Fail to rename current leader schedule on disk because :{err}"
);
}
}
//save new vote stake in a file for bootstrap.
if let Err(err) = crate::utils::save_schedule_vote_stakes(
crate::bootstrap::NEXT_EPOCH_VOTE_STAKES_FILE,
&epoch_vote_stakes,
next_epoch,
) {
log::error!(
"Error during saving the new leader schedule of epoch:{} in a file error:{err}",
next_epoch
);
}
epoch_cache.add_stakes_for_epoch(EpochVoteStakes {
epoch: new_epoch,
vote_stakes: epoch_vote_stakes,
});
log::info!("End calculate leader schedule");
let rpc_data = LeaderScheduleData {
schedule_by_node:
LeaderScheduleGeneratedData::get_schedule_by_nodes(
&leader_schedule,
),
schedule_by_slot: leader_schedule.get_slot_leaders().to_vec(),
epoch: next_epoch,
};
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
epoch_cache,
LeaderScheduleGeneratedData {
schedule: leader_schedule,
rpc_data,
epoch: next_epoch,
},
(new_epoch, slots_in_epoch, new_rate_activation_epoch),
stake_history,
)
}
});
LeaderScheduleResult::TaskHandle(jh)
}
TakeResult::Taken(stake_notify) => {
let notif_jh = tokio::spawn({
async move {
let notifs = stake_notify
.iter()
.map(|n| n.notified())
.collect::<Vec<tokio::sync::futures::Notified>>();
join_all(notifs).await;
LeaderScheduleEvent::Init(
new_epoch,
slots_in_epoch,
new_rate_activation_epoch,
stake_history,
)
}
});
LeaderScheduleResult::TaskHandle(notif_jh)
}
}
}
LeaderScheduleEvent::MergeStoreAndSaveSchedule(
stake_map,
vote_map,
epoch_cache,
schedule_data,
(new_epoch, slots_in_epoch, epoch_schedule),
stake_history,
) => {
match (
stakestore.stakes.merge(stake_map),
votestore.votes.merge((vote_map, epoch_cache)),
) {
(Ok(()), Ok(())) => LeaderScheduleResult::End(schedule_data),
_ => {
//this shoud never arrive because the store has been extracted before.
//TODO remove this error using type state
log::warn!("LeaderScheduleEvent::MergeStoreAndSaveSchedule merge stake or vote fail, -restart Schedule");
LeaderScheduleResult::Event(LeaderScheduleEvent::Init(
new_epoch,
slots_in_epoch,
epoch_schedule,
stake_history,
))
}
}
}
}
}
fn calculate_epoch_stakes(
stake_map: &StakeMap,
vote_map: &VoteMap,
new_epoch: u64,
stake_history: &StakeHistory,
new_rate_activation_epoch: Option<solana_sdk::clock::Epoch>,
) -> HashMap<Pubkey, (u64, Arc<StoredVote>)> {
//calculate schedule stakes at beginning of new epoch.
//Next epoch schedule use the stake at the beginning of last epoch.
let delegated_stakes: HashMap<Pubkey, u64> =
stake_map
.values()
.fold(HashMap::default(), |mut delegated_stakes, stake_account| {
let delegation = stake_account.stake;
let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
*entry +=
delegation.stake(new_epoch, Some(stake_history), new_rate_activation_epoch);
delegated_stakes
});
let staked_vote_map: HashMap<Pubkey, (u64, Arc<StoredVote>)> = vote_map
.values()
.map(|vote_account| {
let delegated_stake = delegated_stakes
.get(&vote_account.pubkey)
.copied()
.unwrap_or_else(|| {
log::info!(
"calculate_epoch_stakes stake with no vote account:{}",
vote_account.pubkey
);
Default::default()
});
(vote_account.pubkey, (delegated_stake, vote_account.clone()))
})
.collect();
staked_vote_map
}
//Copied from leader_schedule_utils.rs
// Mostly cribbed from leader_schedule_utils
pub fn calculate_leader_schedule(
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64,
slots_in_epoch: u64,
) -> LeaderSchedule {
let stakes_map: HashMap<Pubkey, u64> = stake_vote_map
.iter()
.filter_map(|(_, (stake, vote_account))| {
(*stake != 0u64).then_some((vote_account.vote_data.node_pubkey, *stake))
})
.into_grouping_map()
.aggregate(|acc, _node_pubkey, stake| Some(acc.unwrap_or_default() + stake));
let mut stakes: Vec<(Pubkey, u64)> = stakes_map
.into_iter()
.map(|(key, stake)| (key, stake))
.collect();
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
sort_stakes(&mut stakes);
//log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS)
}
pub fn calculate_slot_leaders_from_schedule(
leader_scheudle: &HashMap<String, Vec<usize>>,
) -> Result<Vec<Pubkey>, String> {
let mut slot_leaders_map = BTreeMap::new();
for (pk, index_list) in leader_scheudle {
for index in index_list {
let pubkey = Pubkey::from_str(pk)
.map_err(|err| format!("Pubkey from leader schedule not a plublic key:{err}"))?;
slot_leaders_map.insert(index, pubkey);
}
}
Ok(slot_leaders_map.into_values().collect())
}
// Cribbed from leader_schedule_utils
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
// deterministic result.
// Note: Use unstable sort, because we dedup right after to remove the equal elements.
stakes.sort_unstable_by(|(l_pubkey, l_stake), (r_pubkey, r_stake)| {
if r_stake == l_stake {
r_pubkey.cmp(l_pubkey)
} else {
r_stake.cmp(l_stake)
}
});
// Now that it's sorted, we can do an O(n) dedup.
stakes.dedup();
}

406
stake_vote/src/lib.rs Normal file
View File

@ -0,0 +1,406 @@
use crate::account::AccountPretty;
use crate::bootstrap::BootstrapEvent;
use futures::Stream;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use solana_lite_rpc_core::stores::block_information_store::BlockInformation;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig;
use solana_lite_rpc_core::types::SlotStream;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::response::RpcVoteAccountStatus;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::CommitmentLevel;
use yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
use yellowstone_grpc_proto::prelude::SubscribeUpdate;
use yellowstone_grpc_proto::tonic::Status;
mod account;
mod bootstrap;
mod epoch;
mod leader_schedule;
mod rpcrequest;
mod stake;
mod utils;
mod vote;
// pub use bootstrap::{bootstrap_leaderschedule_from_files, bootstrap_leaderschedule_from_rpc};
const STAKESTORE_INITIAL_CAPACITY: usize = 600000;
const VOTESTORE_INITIAL_CAPACITY: usize = 600000;
type Slot = u64;
pub async fn bootstrat_literpc_leader_schedule(
rpc_url: String,
data_cache: &DataCache,
current_epoch_of_loading: u64,
) {
//init leader schedule grpc process.
//1) get stored schedule and stakes
let slots_per_epoch = data_cache.epoch_data.get_epoch_schedule().slots_per_epoch;
match crate::bootstrap::bootstrap_leaderschedule_from_files(
current_epoch_of_loading,
slots_per_epoch,
) {
Some((leader_schedule, vote_stakes)) => {
data_cache
.identity_stakes
.update_stakes_for_identity(vote_stakes)
.await;
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = leader_schedule;
}
None => {
log::info!("Leader schedule bootstrap file not found. Try to boot from rpc.");
match crate::bootstrap::bootstrap_leaderschedule_from_rpc(
rpc_url,
data_cache.epoch_data.get_epoch_schedule(),
) {
Ok(leader_schedule) => {
log::info!("Leader schedule bootstrap from rpc done.",);
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = leader_schedule;
}
Err(err) => {
log::warn!(
"An error occurs during bootstrap of the leader schedule using rpc:{err}"
);
log::warn!("No schedule has been loaded");
}
}
}
}
}
pub async fn start_stakes_and_votes_loop(
data_cache: DataCache,
mut slot_notification: SlotStream,
mut vote_account_rpc_request: Receiver<(
GetVoteAccountsConfig,
tokio::sync::oneshot::Sender<RpcVoteAccountStatus>,
)>,
rpc_client: Arc<RpcClient>,
grpc_url: String,
) -> anyhow::Result<tokio::task::JoinHandle<()>> {
log::info!("Start Stake and Vote loop on :{grpc_url}.");
let mut stake_vote_geyser_stream = subscribe_geyser_stake_vote_owner(grpc_url.clone()).await?;
let mut stake_history_geyser_stream = subscribe_geyser_stake_history(grpc_url).await?;
log::info!("Stake and Vote geyser subscription done.");
let jh = tokio::spawn(async move {
//Stake account management struct
let mut stakestore = stake::StakeStore::new(STAKESTORE_INITIAL_CAPACITY);
//Vote account management struct
let mut votestore = vote::VoteStore::new(VOTESTORE_INITIAL_CAPACITY);
//Init bootstrap process
let mut current_schedule_epoch =
crate::bootstrap::bootstrap_schedule_epoch_data(&data_cache).await;
//future execution collection.
let mut spawned_leader_schedule_task = FuturesUnordered::new();
let mut spawned_bootstrap_task = FuturesUnordered::new();
let jh = tokio::spawn(async move {
BootstrapEvent::InitBootstrap {
sleep_time: 1,
rpc_url: rpc_client.url(),
}
});
spawned_bootstrap_task.push(jh);
let mut rpc_request_processor = crate::rpcrequest::RpcRequestData::new();
let mut bootstrap_done = false;
//for test to count the number of account notified at epoch change.
let mut account_update_notification = None;
let mut epoch_wait_account_notification_task = FuturesUnordered::new();
loop {
tokio::select! {
//manage confirm new slot notification to detect epoch change.
Ok(_) = slot_notification.recv() => {
//log::info!("Stake and Vote receive a slot.");
let new_slot = solana_lite_rpc_core::solana_utils::get_current_confirmed_slot(&data_cache).await;
let schedule_event = current_schedule_epoch.process_new_confirmed_slot(new_slot, &data_cache).await;
if bootstrap_done {
if let Some(init_event) = schedule_event {
crate::leader_schedule::run_leader_schedule_events(
init_event,
&mut spawned_leader_schedule_task,
&mut stakestore,
&mut votestore,
);
//for test to count the number of account notified at epoch change.
account_update_notification = Some(0);
let jh = tokio::spawn(async move {
//sleep 3 minutes and count the number of account notification.
tokio::time::sleep(tokio::time::Duration::from_secs(180)).await;
});
epoch_wait_account_notification_task.push(jh);
}
}
}
Some(Ok(())) = epoch_wait_account_notification_task.next() => {
log::info!("Epoch change account count:{} during 3mn", account_update_notification.as_ref().unwrap_or(&0));
account_update_notification = None;
}
Some((config, return_channel)) = vote_account_rpc_request.recv() => {
let commitment = config.commitment.unwrap_or(CommitmentConfig::confirmed());
let BlockInformation { slot, .. } = data_cache
.block_information_store
.get_latest_block(commitment)
.await;
let current_epoch = data_cache.get_current_epoch(commitment).await;
rpc_request_processor.process_get_vote_accounts(slot, current_epoch.epoch, config, return_channel, &mut votestore).await;
}
//manage rpc waiting request notification.
Some(Ok((votes, vote_accounts, rpc_vote_accounts))) = rpc_request_processor.rpc_exec_task.next() => {
rpc_request_processor.notify_end_rpc_get_vote_accounts(
votes,
vote_accounts,
rpc_vote_accounts,
&mut votestore,
).await;
}
//manage rpc waiting request notification.
Some(Ok((current_slot, epoch, config))) = rpc_request_processor.rpc_notify_task.next() => {
rpc_request_processor.take_vote_accounts_and_process(&mut votestore, current_slot, epoch, config).await;
}
//manage geyser stake_history notification
ret = stake_history_geyser_stream.next() => {
match ret {
Some(Ok(msg)) => {
if let Some(UpdateOneof::Account(account)) = msg.update_oneof {
if let Some(account) = account.account {
let acc_id = Pubkey::try_from(account.pubkey).expect("valid pubkey");
if acc_id == solana_sdk::sysvar::stake_history::ID {
log::debug!("Geyser notifstake_history");
match crate::account::read_historystake_from_account(account.data.as_slice()) {
Some(stake_history) => {
let schedule_event = current_schedule_epoch.set_epoch_stake_history(stake_history);
if bootstrap_done {
if let Some(init_event) = schedule_event {
crate::leader_schedule::run_leader_schedule_events(
init_event,
&mut spawned_leader_schedule_task,
&mut stakestore,
&mut votestore,
);
}
}
}
None => log::error!("Bootstrap error, can't read stake history from geyser account data."),
}
}
}
}
},
None | Some(Err(_)) => {
//TODO Restart geyser connection and the bootstrap.
log::error!("The stake_history geyser stream close or in error try to reconnect and resynchronize.");
break;
}
}
}
//manage geyser account notification
//Geyser delete account notification patch must be installed on the validator.
//see https://github.com/solana-labs/solana/pull/33292
ret = stake_vote_geyser_stream.next() => {
match ret {
Some(message) => {
//process the message
match message {
Ok(msg) => {
match msg.update_oneof {
Some(UpdateOneof::Account(account)) => {
// log::info!("Stake and Vote geyser receive an account:{}.",
// account.account.clone().map(|a|
// solana_sdk::pubkey::Pubkey::try_from(a.pubkey).map(|k| k.to_string())
// .unwrap_or("bad pubkey".to_string()).to_string())
// .unwrap_or("no content".to_string())
// );
//store new account stake.
let current_slot = solana_lite_rpc_core::solana_utils::get_current_confirmed_slot(&data_cache).await;
if let Some(account) = AccountPretty::new_from_geyser(account, current_slot) {
match account.owner {
solana_sdk::stake::program::ID => {
log::trace!("Geyser notif stake account:{}", account);
if let Some(ref mut counter) = account_update_notification {
*counter +=1;
}
if let Err(err) = stakestore.notify_stake_change(
account,
current_schedule_epoch.last_slot_in_epoch,
) {
log::warn!("Can't add new stake from account data err:{}", err);
continue;
}
}
solana_sdk::vote::program::ID => {
//log::info!("Geyser notif VOTE account:{}", account);
let account_pubkey = account.pubkey;
//process vote accout notification
if let Err(err) = votestore.notify_vote_change(account, current_schedule_epoch.last_slot_in_epoch) {
log::warn!("Can't add new stake from account data err:{} account:{}", err, account_pubkey);
continue;
}
}
_ => log::warn!("receive an account notification from a unknown owner:{account:?}"),
}
}
}
Some(UpdateOneof::Ping(_)) => log::trace!("UpdateOneof::Ping"),
Some(UpdateOneof::Slot(slot)) => {
log::trace!("Receive slot slot: {slot:?}");
}
bad_msg => {
log::info!("Geyser stream unexpected message received:{:?}", bad_msg);
}
}
}
Err(error) => {
log::error!("Geyser stream receive an error has message: {error:?}, try to reconnect and resynchronize.");
//todo reconnect and resynchronize.
//break;
}
}
}
None => {
//TODO Restart geyser connection and the bootstrap.
log::error!("The geyser stream close try to reconnect and resynchronize.");
break;
}
}
}
//manage bootstrap event
Some(Ok(event)) = spawned_bootstrap_task.next() => {
match crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, current_schedule_epoch.slots_in_epoch, current_schedule_epoch.current_epoch) {
Ok(Some(boot_res))=> {
match boot_res {
Ok((current_schedule_data, vote_stakes)) => {
data_cache
.identity_stakes
.update_stakes_for_identity(vote_stakes).await;
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = current_schedule_data;
}
Err(err) => {
log::warn!("Error during current leader schedule bootstrap from files:{err}")
}
}
log::info!("Bootstrap done.");
//update current epoch to manage epoch change during bootstrap.
current_schedule_epoch = crate::bootstrap::bootstrap_schedule_epoch_data(&data_cache).await;
bootstrap_done = true;
},
Ok(None) => (),
Err(err) => log::error!("Stake / Vote Account bootstrap fail because '{err}'"),
}
}
//Manage leader schedule generation process
Some(Ok(event)) = spawned_leader_schedule_task.next() => {
let new_leader_schedule = crate::leader_schedule::run_leader_schedule_events(
event,
&mut spawned_leader_schedule_task,
&mut stakestore,
&mut votestore,
);
if let Some(new_leader_schedule) = new_leader_schedule {
//clone old schedule values is there's other use.
//only done once epoch. Avoid to use a Mutex.
log::info!("End leader schedule calculus for epoch:{}", new_leader_schedule.epoch);
let mut data_schedule = data_cache.leader_schedule.write().await;
data_schedule.current = data_schedule.next.take();
data_schedule.next = Some(new_leader_schedule.rpc_data);
}
}
}
}
});
Ok(jh)
}
//subscribe Geyser grpc
async fn subscribe_geyser_stake_vote_owner(
grpc_url: String,
) -> anyhow::Result<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
let mut client = GeyserGrpcClient::connect(grpc_url, None::<&'static str>, None)?;
//account subscription
let mut accounts: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
accounts.insert(
"stake_vote".to_owned(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![
solana_sdk::stake::program::ID.to_string(),
solana_sdk::vote::program::ID.to_string(),
],
filters: vec![],
},
);
let confirmed_stream = client
.subscribe_once(
Default::default(), //slots
accounts.clone(), //accounts
Default::default(), //tx
Default::default(), //entry
Default::default(), //full block
Default::default(), //block meta
Some(CommitmentLevel::Confirmed),
vec![],
None,
)
.await?;
Ok(confirmed_stream)
}
//subscribe Geyser grpc
async fn subscribe_geyser_stake_history(
grpc_url: String,
) -> anyhow::Result<impl Stream<Item = Result<SubscribeUpdate, Status>>> {
let mut client = GeyserGrpcClient::connect(grpc_url, None::<&'static str>, None)?;
//account subscription
let mut accounts: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();
accounts.insert(
"stake_history".to_owned(),
SubscribeRequestFilterAccounts {
account: vec![solana_sdk::sysvar::stake_history::ID.to_string()],
owner: vec![],
filters: vec![],
},
);
let confirmed_stream = client
.subscribe_once(
Default::default(), //slots
accounts.clone(), //accounts
Default::default(), //tx
Default::default(), //entry
Default::default(), //full block
Default::default(), //block meta
Some(CommitmentLevel::Confirmed),
vec![],
None,
)
.await?;
Ok(confirmed_stream)
}

View File

@ -0,0 +1,120 @@
use crate::utils::wait_for_merge_or_get_content;
use crate::utils::Takable;
use crate::vote::EpochVoteStakesCache;
use crate::vote::VoteMap;
use crate::vote::VoteStore;
use crate::Slot;
use futures_util::stream::FuturesUnordered;
use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig;
use solana_rpc_client_api::response::RpcVoteAccountStatus;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
pub struct RpcRequestData {
pub rpc_notify_task: FuturesUnordered<JoinHandle<(u64, u64, GetVoteAccountsConfig)>>,
pub rpc_exec_task:
FuturesUnordered<JoinHandle<(VoteMap, EpochVoteStakesCache, RpcVoteAccountStatus)>>,
pending_rpc_request: Option<Vec<oneshot::Sender<RpcVoteAccountStatus>>>,
}
impl RpcRequestData {
pub fn new() -> Self {
RpcRequestData {
rpc_notify_task: FuturesUnordered::new(),
rpc_exec_task: FuturesUnordered::new(),
pending_rpc_request: None,
}
}
pub async fn process_get_vote_accounts(
&mut self,
current_slot: Slot,
epoch: u64,
config: GetVoteAccountsConfig,
return_channel: oneshot::Sender<RpcVoteAccountStatus>,
votestore: &mut VoteStore,
) {
match self.pending_rpc_request {
Some(ref mut pending) => pending.push(return_channel),
None => {
self.pending_rpc_request = Some(vec![return_channel]);
}
}
self.take_vote_accounts_and_process(votestore, current_slot, epoch, config)
.await;
}
pub async fn notify_end_rpc_get_vote_accounts(
&mut self,
votes: VoteMap,
vote_accounts: EpochVoteStakesCache,
rpc_vote_accounts: RpcVoteAccountStatus,
votestore: &mut VoteStore,
) {
if let Err(err) = votestore.votes.merge((votes, vote_accounts)) {
log::error!("Error during RPC get vote account merge:{err}");
}
//avoid clone on the first request
if let Some(mut pending_rpc_request) = self.pending_rpc_request.take() {
if pending_rpc_request.len() > 1 {
for return_channel in pending_rpc_request.drain(0..pending_rpc_request.len() - 1) {
if return_channel.send(rpc_vote_accounts.clone()).is_err() {
log::error!("Vote accounts RPC channel send closed.");
}
}
}
if pending_rpc_request
.pop()
.unwrap()
.send(rpc_vote_accounts)
.is_err()
{
log::error!("Vote accounts RPC channel send closed.");
}
}
}
pub async fn take_vote_accounts_and_process(
&mut self,
votestore: &mut VoteStore,
current_slot: Slot,
epoch: u64,
config: GetVoteAccountsConfig,
) {
if let Some(((votes, vote_accounts), (current_slot, epoch, config))) =
wait_for_merge_or_get_content(
&mut votestore.votes,
(current_slot, epoch, config),
&mut self.rpc_notify_task,
)
.await
{
//validate that we have the epoch.
let jh = tokio::task::spawn_blocking({
move || match vote_accounts.vote_stakes_for_epoch(epoch) {
Some(stakes) => {
let rpc_vote_accounts = crate::vote::get_rpc_vote_accounts_info(
current_slot,
&votes,
&stakes.vote_stakes,
config,
);
(votes, vote_accounts, rpc_vote_accounts)
}
None => {
log::warn!("Get vote account for epoch:{epoch}. No data available");
(
votes,
vote_accounts,
RpcVoteAccountStatus {
current: vec![],
delinquent: vec![],
},
)
}
}
});
self.rpc_exec_task.push(jh);
}
}
}

149
stake_vote/src/stake.rs Normal file
View File

@ -0,0 +1,149 @@
use crate::utils::TakableContent;
use crate::utils::TakableMap;
use crate::utils::UpdateAction;
use crate::AccountPretty;
use crate::Slot;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::stake::state::Delegation;
use std::collections::HashMap;
pub type StakeMap = HashMap<Pubkey, StoredStake>;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StoredStake {
pub pubkey: Pubkey,
pub lamports: u64,
pub stake: Delegation,
pub last_update_slot: Slot,
pub write_version: u64,
}
impl TakableContent<StoredStake> for StakeMap {
fn add_value(&mut self, val: UpdateAction<StoredStake>) {
StakeStore::process_stake_action(self, val);
}
}
#[derive(Debug, Default)]
pub struct StakeStore {
pub stakes: TakableMap<StoredStake, StakeMap>,
}
impl StakeStore {
pub fn new(capacity: usize) -> Self {
StakeStore {
stakes: TakableMap::new(HashMap::with_capacity(capacity)),
}
}
pub fn notify_stake_change(
&mut self,
account: AccountPretty,
current_end_epoch_slot: Slot,
) -> anyhow::Result<()> {
//if lamport == 0 the account has been removed.
if account.lamports == 0 {
self.stakes.add_value(
UpdateAction::Remove(account.pubkey, account.slot),
account.slot > current_end_epoch_slot,
);
} else {
let Ok(delegated_stake_opt) = account.read_stake() else {
bail!("Can't read stake from account data");
};
if let Some(delegated_stake) = delegated_stake_opt {
let stake = StoredStake {
pubkey: account.pubkey,
lamports: account.lamports,
stake: delegated_stake,
last_update_slot: account.slot,
write_version: account.write_version,
};
let action_update_slot = stake.last_update_slot;
self.stakes.add_value(
UpdateAction::Notify(action_update_slot, stake),
action_update_slot > current_end_epoch_slot,
);
}
}
Ok(())
}
fn process_stake_action(stakes: &mut StakeMap, action: UpdateAction<StoredStake>) {
match action {
UpdateAction::Notify(_, stake) => {
Self::notify_stake(stakes, stake);
}
UpdateAction::Remove(account_pk, slot) => Self::remove_stake(stakes, &account_pk, slot),
}
}
fn notify_stake(map: &mut StakeMap, stake: StoredStake) {
//log::info!("stake_map_notify_stake stake:{stake:?}");
match map.entry(stake.pubkey) {
// If value already exists, then increment it by one
std::collections::hash_map::Entry::Occupied(occupied) => {
let strstake = occupied.into_mut(); // <-- get mut reference to existing value
//doesn't erase new state with an old one. Can arrive during bootstrapping.
//several instructions can be done in the same slot.
if strstake.last_update_slot <= stake.last_update_slot {
log::trace!("stake_map_notify_stake Stake store updated stake: {} old_stake:{strstake:?} stake:{stake:?}", stake.pubkey);
*strstake = stake;
}
}
// If value doesn't exist yet, then insert a new value of 1
std::collections::hash_map::Entry::Vacant(vacant) => {
log::trace!(
"stake_map_notify_stake Stake store insert stake: {} stake:{stake:?}",
stake.pubkey
);
vacant.insert(stake);
}
};
}
fn remove_stake(stakes: &mut StakeMap, account_pk: &Pubkey, update_slot: Slot) {
if stakes
.get(account_pk)
.map(|stake| stake.last_update_slot <= update_slot)
.unwrap_or(false)
{
log::info!("Stake remove_from_store for {}", account_pk.to_string());
stakes.remove(account_pk);
}
}
}
pub fn merge_program_account_in_strake_map(
stake_map: &mut StakeMap,
stakes_list: Vec<(Pubkey, Account)>,
last_update_slot: Slot,
) {
stakes_list
.into_iter()
.filter_map(|(pk, account)| {
match crate::account::read_stake_from_account_data(&account.data) {
Ok(opt_stake) => opt_stake.map(|stake| (pk, stake, account.lamports)),
Err(err) => {
log::warn!("Error during pa account data deserialisation:{err}");
None
}
}
})
.for_each(|(pk, delegated_stake, lamports)| {
let stake = StoredStake {
pubkey: pk,
lamports,
stake: delegated_stake,
last_update_slot,
write_version: 0,
};
StakeStore::notify_stake(stake_map, stake);
});
}

303
stake_vote/src/utils.rs Normal file
View File

@ -0,0 +1,303 @@
use crate::vote::StoredVote;
use crate::Slot;
use anyhow::bail;
use futures_util::future::join_all;
use futures_util::stream::FuturesUnordered;
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::epoch::Epoch as LiteRpcEpoch;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::default::Default;
use std::fs::File;
use std::io::Write;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
pub async fn get_current_epoch(data_cache: &DataCache) -> LiteRpcEpoch {
let commitment = CommitmentConfig::confirmed();
data_cache.get_current_epoch(commitment).await
}
//Read save epoch vote stake to bootstrap current leader shedule and get_vote_account.
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct StringSavedStake {
epoch: u64,
stake_vote_map: HashMap<String, (u64, Arc<StoredVote>)>,
}
#[allow(clippy::type_complexity)]
pub fn read_schedule_vote_stakes(
file_path: &str,
) -> anyhow::Result<(u64, HashMap<Pubkey, (u64, Arc<StoredVote>)>)> {
let content = std::fs::read_to_string(file_path)?;
let stakes_str: StringSavedStake = serde_json::from_str(&content)?;
//convert to EpochStake because json hashmap parser can only have String key.
let ret_stakes = stakes_str
.stake_vote_map
.into_iter()
.map(|(pk, st)| (Pubkey::from_str(&pk).unwrap(), (st.0, st.1)))
.collect();
Ok((stakes_str.epoch, ret_stakes))
}
pub fn save_schedule_vote_stakes(
base_file_path: &str,
stake_vote_map: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
epoch: u64,
) -> anyhow::Result<()> {
//save new schedule for restart.
//need to convert hahsmap key to String because json aloow only string
//key for dictionnary.
//it's better to use json because the file can use to very some stake by hand.
//in the end it will be removed with the bootstrap process.
let save_stakes = StringSavedStake {
epoch,
stake_vote_map: stake_vote_map
.iter()
.map(|(pk, st)| (pk.to_string(), (st.0, Arc::clone(&st.1))))
.collect(),
};
let serialized_stakes = serde_json::to_string(&save_stakes).unwrap();
let mut file = File::create(base_file_path).unwrap();
file.write_all(serialized_stakes.as_bytes()).unwrap();
file.flush().unwrap();
Ok(())
}
#[derive(Debug)]
pub enum UpdateAction<Account> {
Notify(Slot, Account),
Remove(Pubkey, Slot),
}
pub enum TakeResult<C> {
//Vec because can wait on several collection to be merged
Taken(Vec<Arc<Notify>>),
Map(C),
}
impl<C1> TakeResult<C1> {
pub fn and_then<C2>(self, action: TakeResult<C2>) -> TakeResult<(C1, C2)> {
match (self, action) {
(TakeResult::Taken(mut notif1), TakeResult::Taken(mut notif2)) => {
notif1.append(&mut notif2);
TakeResult::Taken(notif1)
}
(TakeResult::Map(content1), TakeResult::Map(content2)) => {
TakeResult::Map((content1, content2))
}
_ => unreachable!("Bad take result association."), //TODO add mix result.
}
}
}
//Takable struct code
pub trait TakableContent<T>: Default {
fn add_value(&mut self, val: UpdateAction<T>);
}
//Takable struct code
pub trait Takable<C> {
fn take(self) -> TakeResult<C>;
fn merge(self, content: C) -> anyhow::Result<()>;
fn is_taken(&self) -> bool;
}
impl<'a, T, C: TakableContent<T>> Takable<C> for &'a mut TakableMap<T, C> {
fn take(self) -> TakeResult<C> {
match self.content.take() {
Some(content) => TakeResult::Map(content),
None => TakeResult::Taken(vec![Arc::clone(&self.notifier)]),
}
}
fn merge(self, mut content: C) -> anyhow::Result<()> {
if self.content.is_none() {
//apply stake added during extraction.
for val in self.updates.drain(..) {
content.add_value(val);
}
self.content = Some(content);
self.notifier.notify_one();
Ok(())
} else {
bail!("TakableMap with a existing content".to_string())
}
}
fn is_taken(&self) -> bool {
self.content.is_none()
}
}
impl<'a, T1, T2, C1: TakableContent<T1>, C2: TakableContent<T2>> Takable<(C1, C2)>
for (&'a mut TakableMap<T1, C1>, &'a mut TakableMap<T2, C2>)
{
fn take(self) -> TakeResult<(C1, C2)> {
let first = self.0;
let second = self.1;
match (first.is_taken(), second.is_taken()) {
(true, true) | (false, false) => first.take().and_then(second.take()),
(true, false) => {
match first.take() {
TakeResult::Taken(notif) => TakeResult::Taken(notif),
TakeResult::Map(_) => unreachable!(), //tested before.
}
}
(false, true) => {
match second.take() {
TakeResult::Taken(notif) => TakeResult::Taken(notif),
TakeResult::Map(_) => unreachable!(), //tested before.
}
}
}
}
fn merge(self, content: (C1, C2)) -> anyhow::Result<()> {
self.0
.merge(content.0)
.and_then(|_| self.1.merge(content.1))
}
fn is_taken(&self) -> bool {
self.0.is_taken() && self.1.is_taken()
}
}
pub async fn wait_for_merge_or_get_content<NotifyContent: std::marker::Send + 'static, C>(
take_map: impl Takable<C>,
notify_content: NotifyContent,
waiter_futures: &mut FuturesUnordered<JoinHandle<NotifyContent>>,
) -> Option<(C, NotifyContent)> {
match take_map.take() {
TakeResult::Map(content) => Some((content, notify_content)),
TakeResult::Taken(stake_notify) => {
let notif_jh = tokio::spawn({
async move {
let notifs = stake_notify
.iter()
.map(|n| n.notified())
.collect::<Vec<tokio::sync::futures::Notified>>();
join_all(notifs).await;
notify_content
}
});
waiter_futures.push(notif_jh);
None
}
}
}
///A struct that hold a collection call content that can be taken during some time and merged after.
///During the time the content is taken, new added values are cached and added to the content after the merge.
///It allow to process struct content while allowing to still update it without lock.
#[derive(Default, Debug)]
pub struct TakableMap<T, C: TakableContent<T>> {
pub content: Option<C>,
pub updates: Vec<UpdateAction<T>>,
notifier: Arc<Notify>,
}
impl<T: Default, C: TakableContent<T> + Default> TakableMap<T, C> {
pub fn new(content: C) -> Self {
TakableMap {
content: Some(content),
updates: vec![],
notifier: Arc::new(Notify::new()),
}
}
//add a value to the content if not taken or put it in the update waiting list.
//Use force_in_update to force the insert in update waiting list.
pub fn add_value(&mut self, val: UpdateAction<T>, force_in_update: bool) {
//during extract push the new update or
//don't insert now account change that has been done in next epoch.
//put in update pool to be merged next epoch change.
//log::info!("tm u:{} c:{} f:{}", self.updates.len(), self.content.is_none(), force_in_update);
match self.content.is_none() || force_in_update {
true => self.updates.push(val),
false => {
let content = self.content.as_mut().unwrap(); //unwrap tested
content.add_value(val);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_takable_struct() {
impl TakableContent<u64> for Vec<u64> {
fn add_value(&mut self, val: UpdateAction<u64>) {
match val {
UpdateAction::Notify(account, _) => self.push(account),
UpdateAction::Remove(_, _) => (),
}
}
}
let content: Vec<u64> = vec![];
let mut takable = TakableMap::new(content);
takable.add_value(UpdateAction::Notify(23, 0), false);
assert_eq!(takable.content.as_ref().unwrap().len(), 1);
takable.add_value(UpdateAction::Notify(24, 0), true);
assert_eq!(takable.content.as_ref().unwrap().len(), 1);
assert_eq!(takable.updates.len(), 1);
let take_content = (&mut takable).take();
assert_take_content_map(&take_content, 1);
let content = match take_content {
TakeResult::Taken(_) => panic!("not a content"),
TakeResult::Map(content) => content,
};
assert_eq!(takable.updates.len(), 1);
let take_content = (&mut takable).take();
assert_take_content_taken(&take_content);
let notifier = match take_content {
TakeResult::Taken(notifier) => notifier,
TakeResult::Map(_) => panic!("not a notifier"),
};
assert_eq!(notifier.len(), 1);
let notif_jh = tokio::spawn(async move {
notifier[0].as_ref().notified().await;
});
assert!(takable.content.is_none());
assert_eq!(takable.updates.len(), 1);
takable.add_value(UpdateAction::Notify(25, 0), false);
assert_eq!(takable.updates.len(), 2);
takable.merge(content).unwrap();
assert_eq!(takable.content.as_ref().unwrap().len(), 3);
assert_eq!(takable.updates.len(), 0);
//wait for notifier
if tokio::time::timeout(std::time::Duration::from_millis(1000), notif_jh)
.await
.is_err()
{
panic!("take notifier timeout");
}
}
fn assert_take_content_map(take_content: &TakeResult<Vec<u64>>, len: usize) {
match take_content {
TakeResult::Taken(_) => unreachable!(),
TakeResult::Map(content) => assert_eq!(content.len(), len),
}
}
fn assert_take_content_taken(take_content: &TakeResult<Vec<u64>>) {
match take_content {
TakeResult::Taken(_) => (),
TakeResult::Map(_) => unreachable!(),
}
}
}

296
stake_vote/src/vote.rs Normal file
View File

@ -0,0 +1,296 @@
use crate::utils::TakableContent;
use crate::utils::TakableMap;
use crate::utils::UpdateAction;
use crate::AccountPretty;
use crate::Slot;
use anyhow::bail;
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig;
use solana_rpc_client_api::request::MAX_RPC_VOTE_ACCOUNT_INFO_EPOCH_CREDITS_HISTORY;
use solana_rpc_client_api::response::RpcVoteAccountInfo;
use solana_rpc_client_api::response::RpcVoteAccountStatus;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::vote::state::VoteState;
use std::collections::HashMap;
use std::sync::Arc;
pub type VoteMap = HashMap<Pubkey, Arc<StoredVote>>;
pub type VoteContent = (VoteMap, EpochVoteStakesCache);
#[derive(Debug, Clone)]
pub struct EpochVoteStakes {
pub vote_stakes: HashMap<Pubkey, (u64, Arc<StoredVote>)>,
pub epoch: u64,
}
//TODO define the cache invalidation.
#[derive(Default)]
pub struct EpochVoteStakesCache {
pub cache: HashMap<u64, EpochVoteStakes>,
}
impl EpochVoteStakesCache {
pub fn vote_stakes_for_epoch(&self, epoch: u64) -> Option<&EpochVoteStakes> {
self.cache.get(&epoch)
}
pub fn add_stakes_for_epoch(&mut self, vote_stakes: EpochVoteStakes) {
log::debug!("add_stakes_for_epoch :{}", vote_stakes.epoch);
self.cache.insert(vote_stakes.epoch, vote_stakes);
}
}
impl TakableContent<StoredVote> for VoteContent {
fn add_value(&mut self, val: UpdateAction<StoredVote>) {
VoteStore::process_vote_action(&mut self.0, val);
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StoredVote {
pub pubkey: Pubkey,
pub vote_data: VoteState,
pub last_update_slot: Slot,
pub write_version: u64,
}
impl StoredVote {
pub fn convert_to_rpc_vote_account_info(
&self,
activated_stake: u64,
epoch_vote_account: bool,
) -> RpcVoteAccountInfo {
let last_vote = self
.vote_data
.votes
.iter()
.last()
.map(|vote| vote.slot())
.unwrap_or_default();
RpcVoteAccountInfo {
vote_pubkey: self.pubkey.to_string(),
node_pubkey: self.vote_data.node_pubkey.to_string(),
activated_stake,
commission: self.vote_data.commission,
epoch_vote_account,
epoch_credits: self.vote_data.epoch_credits.clone(),
last_vote,
root_slot: self.vote_data.root_slot.unwrap_or_default(),
}
}
}
#[derive(Default)]
pub struct VoteStore {
pub votes: TakableMap<StoredVote, VoteContent>,
}
impl VoteStore {
pub fn new(capacity: usize) -> Self {
VoteStore {
votes: TakableMap::new((
HashMap::with_capacity(capacity),
EpochVoteStakesCache::default(),
)),
}
}
pub fn notify_vote_change(
&mut self,
new_account: AccountPretty,
current_end_epoch_slot: Slot,
) -> anyhow::Result<()> {
if new_account.lamports == 0 {
//self.remove_from_store(&new_account.pubkey, new_account.slot);
self.votes.add_value(
UpdateAction::Remove(new_account.pubkey, new_account.slot),
new_account.slot > current_end_epoch_slot,
);
} else {
let Ok(mut vote_data) = new_account.read_vote() else {
bail!("Can't read Vote from account data");
};
//remove unnecessary entry. See Solana code rpc::rpc::get_vote_accounts
let epoch_credits = vote_data.epoch_credits();
vote_data.epoch_credits =
if epoch_credits.len() > MAX_RPC_VOTE_ACCOUNT_INFO_EPOCH_CREDITS_HISTORY {
epoch_credits
.iter()
.skip(epoch_credits.len() - MAX_RPC_VOTE_ACCOUNT_INFO_EPOCH_CREDITS_HISTORY)
.cloned()
.collect()
} else {
epoch_credits.clone()
};
//log::info!("add_vote {} :{vote_data:?}", new_account.pubkey);
let new_voteacc = StoredVote {
pubkey: new_account.pubkey,
vote_data,
last_update_slot: new_account.slot,
write_version: new_account.write_version,
};
let action_update_slot = new_voteacc.last_update_slot;
self.votes.add_value(
UpdateAction::Notify(action_update_slot, new_voteacc),
action_update_slot > current_end_epoch_slot,
);
}
Ok(())
}
fn process_vote_action(votes: &mut VoteMap, action: UpdateAction<StoredVote>) {
match action {
UpdateAction::Notify(_, vote) => {
Self::vote_map_insert_vote(votes, vote);
}
UpdateAction::Remove(account_pk, slot) => {
Self::remove_from_store(votes, &account_pk, slot)
}
}
}
fn remove_from_store(votes: &mut VoteMap, account_pk: &Pubkey, update_slot: Slot) {
//TODO use action.
if votes
.get(account_pk)
.map(|vote| vote.last_update_slot <= update_slot)
.unwrap_or(true)
{
log::info!("Vote remove_from_store for {}", account_pk.to_string());
votes.remove(account_pk);
}
}
fn vote_map_insert_vote(map: &mut VoteMap, vote_data: StoredVote) {
let vote_account_pk = vote_data.pubkey;
match map.entry(vote_account_pk) {
std::collections::hash_map::Entry::Occupied(occupied) => {
let voteacc = occupied.into_mut(); // <-- get mut reference to existing value
if voteacc.last_update_slot <= vote_data.last_update_slot {
// generate a lot of trace log::trace!(
// "Vote updated for: {vote_account_pk} node_id:{} root_slot:{:?}",
// vote_data.vote_data.node_pubkey,
// vote_data.vote_data.root_slot,
// );
// if vote_data.vote_data.root_slot.is_none() {
// log::info!("Update vote account:{vote_account_pk} with None root slot.");
// }
// if voteacc.vote_data.root_slot.is_none() {
// log::info!(
// "Update vote account:{vote_account_pk} that were having None root slot."
// );
// }
*voteacc = Arc::new(vote_data);
}
}
// If value doesn't exist yet, then insert a new value of 1
std::collections::hash_map::Entry::Vacant(vacant) => {
log::trace!(
"New Vote added for: {vote_account_pk} node_id:{}, root slot:{:?}",
vote_data.vote_data.node_pubkey,
vote_data.vote_data.root_slot,
);
vacant.insert(Arc::new(vote_data));
}
};
}
}
pub fn merge_program_account_in_vote_map(
vote_map: &mut VoteMap,
pa_list: Vec<(Pubkey, Account)>,
last_update_slot: Slot,
) {
pa_list
.into_iter()
.filter_map(
|(pk, account)| match VoteState::deserialize(&account.data) {
Ok(vote) => Some((pk, vote)),
Err(err) => {
log::warn!("Error during vote account data deserialisation:{err}");
None
}
},
)
.for_each(|(pk, vote)| {
//log::info!("Vote init {pk} :{vote:?}");
let vote = StoredVote {
pubkey: pk,
vote_data: vote,
last_update_slot,
write_version: 0,
};
VoteStore::vote_map_insert_vote(vote_map, vote);
});
}
// Validators that are this number of slots behind are considered delinquent
pub fn get_rpc_vote_accounts_info(
current_slot: Slot,
votes: &VoteMap,
vote_accounts: &HashMap<Pubkey, (u64, Arc<StoredVote>)>,
config: GetVoteAccountsConfig,
) -> RpcVoteAccountStatus {
pub const DELINQUENT_VALIDATOR_SLOT_DISTANCE: u64 =
solana_rpc_client_api::request::DELINQUENT_VALIDATOR_SLOT_DISTANCE;
let delinquent_validator_slot_distance = config
.delinquent_slot_distance
.unwrap_or(DELINQUENT_VALIDATOR_SLOT_DISTANCE);
//From Solana rpc::rpc::metaz::get_vote_accounts() code.
let (current_vote_accounts, delinquent_vote_accounts): (
Vec<RpcVoteAccountInfo>,
Vec<RpcVoteAccountInfo>,
) = votes
.values()
.map(|vote| {
let (stake, epoch_vote_account) = vote_accounts
.get(&vote.pubkey)
.map(|(stake, _)| (*stake, true))
.unwrap_or((0, false));
vote.convert_to_rpc_vote_account_info(stake, epoch_vote_account)
})
.partition(|vote_account_info| {
if current_slot >= delinquent_validator_slot_distance {
vote_account_info.last_vote > current_slot - delinquent_validator_slot_distance
} else {
vote_account_info.last_vote > 0
}
});
let keep_unstaked_delinquents = config.keep_unstaked_delinquents.unwrap_or_default();
let delinquent_vote_accounts = if !keep_unstaked_delinquents {
delinquent_vote_accounts
.into_iter()
.filter(|vote_account_info| vote_account_info.activated_stake > 0)
.collect::<Vec<_>>()
} else {
delinquent_vote_accounts
};
RpcVoteAccountStatus {
current: current_vote_accounts,
delinquent: delinquent_vote_accounts,
}
}
pub fn get_rpc_vote_account_info_from_current_epoch_stakes(
current_epoch_stakes: &EpochVoteStakes,
) -> RpcVoteAccountStatus {
let current_vote_accounts: Vec<RpcVoteAccountInfo> = current_epoch_stakes
.vote_stakes
.values()
.map(|(stake, vote)| vote.convert_to_rpc_vote_account_info(*stake, true))
.collect();
RpcVoteAccountStatus {
current: current_vote_accounts,
delinquent: vec![], //no info about delinquent at startup.
}
}

View File

@ -78,3 +78,13 @@ test('get epoch info', async () => {
});
test('get leader schedule', async () => {
{
const leaderSchedule = await connection.getLeaderSchedule();
expect(Object.keys(leaderSchedule).length > 0);
}
});

107
yarn.lock
View File

@ -22,7 +22,7 @@
resolved "https://registry.npmjs.org/@babel/compat-data/-/compat-data-7.20.10.tgz"
integrity sha512-sEnuDPpOJR/fcafHMjpcpGN5M2jbUGUHwmuWKM/YdPzeEDJg8bgmbcWQFUfE32MQjti1koACvoPVsDe8Uq+idg==
"@babel/core@^7.11.6", "@babel/core@^7.12.3":
"@babel/core@^7.0.0", "@babel/core@^7.0.0-0", "@babel/core@^7.11.6", "@babel/core@^7.12.3", "@babel/core@^7.8.0", "@babel/core@>=7.0.0-beta.0 <8":
version "7.20.12"
resolved "https://registry.npmjs.org/@babel/core/-/core-7.20.12.tgz"
integrity sha512-XsMfHovsUYHFMdrIHkZphTN/2Hzzi78R08NuHfDBehym2VsPDL6Zn/JAD/JQdnRvbSsbQc4mVaU1m6JgtTEElg==
@ -501,7 +501,7 @@
slash "^3.0.0"
write-file-atomic "^4.0.1"
"@jest/types@^29.3.1":
"@jest/types@^29.0.0", "@jest/types@^29.3.1":
version "29.3.1"
resolved "https://registry.npmjs.org/@jest/types/-/types-29.3.1.tgz"
integrity sha512-d0S0jmmTpjnhCmNpApgX3jrUZgZ22ivKJRvL2lli5hpCRoNnp1f85r2/wpKfXuYu8E7Jjh1hGfhPyup1NM5AmA==
@ -540,7 +540,7 @@
resolved "https://registry.npmjs.org/@jridgewell/set-array/-/set-array-1.1.2.tgz"
integrity sha512-xnkseuNADM0gt2bs+BvhO0p78Mk762YnZdsuzFV018NoG1Sj1SCQvpSqa7XUaTam5vAGasABV9qXASMKnFMwMw==
"@jridgewell/sourcemap-codec@1.4.14", "@jridgewell/sourcemap-codec@^1.4.10":
"@jridgewell/sourcemap-codec@^1.4.10", "@jridgewell/sourcemap-codec@1.4.14":
version "1.4.14"
resolved "https://registry.npmjs.org/@jridgewell/sourcemap-codec/-/sourcemap-codec-1.4.14.tgz"
integrity sha512-XPSJHWmi394fuUuzDnGz1wiKqWfo1yXecHQMRf2l6hztTO+nPru658AyDngaBe7isIxEkRsPR3FZh+s7iVa4Uw==
@ -613,7 +613,7 @@
"@solana/buffer-layout-utils" "^0.2.0"
buffer "^6.0.3"
"@solana/web3.js@^1.32.0", "@solana/web3.js@^1.73.0":
"@solana/web3.js@^1.32.0", "@solana/web3.js@^1.47.4", "@solana/web3.js@^1.73.0":
version "1.73.0"
resolved "https://registry.npmjs.org/@solana/web3.js/-/web3.js-1.73.0.tgz"
integrity sha512-YrgX3Py7ylh8NYkbanoINUPCj//bWUjYZ5/WPy9nQ9SK3Cl7QWCR+NmbDjmC/fTspZGR+VO9LTQslM++jr5PRw==
@ -743,14 +743,6 @@
dependencies:
"@types/yargs-parser" "*"
JSONStream@^1.3.5:
version "1.3.5"
resolved "https://registry.npmjs.org/JSONStream/-/JSONStream-1.3.5.tgz"
integrity sha512-E+iruNOY8VV9s4JEbe1aNEm6MiszPRr/UfcHMz0TQh1BXSxHK+ASV1R6W4HpjBhSeS+54PIsAMCBmwD06LLsqQ==
dependencies:
jsonparse "^1.2.0"
through ">=2.2.7 <3"
agentkeepalive@^4.2.1:
version "4.2.1"
resolved "https://registry.npmjs.org/agentkeepalive/-/agentkeepalive-4.2.1.tgz"
@ -806,7 +798,7 @@ argparse@^1.0.7:
dependencies:
sprintf-js "~1.0.2"
babel-jest@^29.3.1:
babel-jest@^29.0.0, babel-jest@^29.3.1:
version "29.3.1"
resolved "https://registry.npmjs.org/babel-jest/-/babel-jest-29.3.1.tgz"
integrity sha512-aard+xnMoxgjwV70t0L6wkW/3HQQtV+O0PEimxKgzNqCJnbYmroPojdP2tqKSOAt8QAKV/uSZU8851M7B5+fcA==
@ -931,7 +923,7 @@ braces@^3.0.2:
dependencies:
fill-range "^7.0.1"
browserslist@^4.21.3:
browserslist@^4.21.3, "browserslist@>= 4.21.0":
version "4.21.4"
resolved "https://registry.npmjs.org/browserslist/-/browserslist-4.21.4.tgz"
integrity sha512-CBHJJdDmgjl3daYjN5Cp5kbTf1mUhZoS+beLklHIvkOWscs83YAhLlF3Wsh/lciQYAcbBJgTOD44VtG31ZM4Hw==
@ -967,14 +959,6 @@ buffer-from@^1.0.0:
resolved "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz"
integrity sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==
buffer@6.0.1:
version "6.0.1"
resolved "https://registry.npmjs.org/buffer/-/buffer-6.0.1.tgz"
integrity sha512-rVAXBwEcEoYtxnHSO5iWyhzV/O1WMtkUYWlfdLS7FjU4PnSJJHEfHXi/uHPI5EwltmOA794gN3bm3/pzuctWjQ==
dependencies:
base64-js "^1.3.1"
ieee754 "^1.2.1"
buffer@^6.0.3, buffer@~6.0.3:
version "6.0.3"
resolved "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz"
@ -983,6 +967,14 @@ buffer@^6.0.3, buffer@~6.0.3:
base64-js "^1.3.1"
ieee754 "^1.2.1"
buffer@6.0.1:
version "6.0.1"
resolved "https://registry.npmjs.org/buffer/-/buffer-6.0.1.tgz"
integrity sha512-rVAXBwEcEoYtxnHSO5iWyhzV/O1WMtkUYWlfdLS7FjU4PnSJJHEfHXi/uHPI5EwltmOA794gN3bm3/pzuctWjQ==
dependencies:
base64-js "^1.3.1"
ieee754 "^1.2.1"
bufferutil@^4.0.1:
version "4.0.7"
resolved "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.7.tgz"
@ -1075,16 +1067,16 @@ color-convert@^2.0.1:
dependencies:
color-name "~1.1.4"
color-name@1.1.3:
version "1.1.3"
resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.3.tgz"
integrity sha512-72fSenhMw2HZMTVHeCA9KCmpEIbzWiQsjN+BHcBbS9vr1mtt+vJjPdksIBNUmKAW8TFUDPJK5SUU3QhE9NEXDw==
color-name@~1.1.4:
version "1.1.4"
resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz"
integrity sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==
color-name@1.1.3:
version "1.1.3"
resolved "https://registry.npmjs.org/color-name/-/color-name-1.1.3.tgz"
integrity sha512-72fSenhMw2HZMTVHeCA9KCmpEIbzWiQsjN+BHcBbS9vr1mtt+vJjPdksIBNUmKAW8TFUDPJK5SUU3QhE9NEXDw==
commander@^2.20.3:
version "2.20.3"
resolved "https://registry.npmjs.org/commander/-/commander-2.20.3.tgz"
@ -1095,7 +1087,12 @@ concat-map@0.0.1:
resolved "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz"
integrity sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==
convert-source-map@^1.6.0, convert-source-map@^1.7.0:
convert-source-map@^1.6.0:
version "1.9.0"
resolved "https://registry.npmjs.org/convert-source-map/-/convert-source-map-1.9.0.tgz"
integrity sha512-ASFBup0Mz1uyiIjANan1jzLQami9z1PoYSZCiiYW2FczPbenXc45FZdBZLzOT+r6+iciuEModtmCti+hjaAk0A==
convert-source-map@^1.7.0:
version "1.9.0"
resolved "https://registry.npmjs.org/convert-source-map/-/convert-source-map-1.9.0.tgz"
integrity sha512-ASFBup0Mz1uyiIjANan1jzLQami9z1PoYSZCiiYW2FczPbenXc45FZdBZLzOT+r6+iciuEModtmCti+hjaAk0A==
@ -1116,7 +1113,7 @@ cross-spawn@^7.0.3:
crypto@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/crypto/-/crypto-1.0.1.tgz#2af1b7cad8175d24c8a1b0778255794a21803037"
resolved "https://registry.npmjs.org/crypto/-/crypto-1.0.1.tgz"
integrity sha512-VxBKmeNcqQdiUQUW2Tzq0t377b54N2bMtXO/qiLa+6eRRmmC4qT3D4OnTGoT/U6O9aklQ/jTwbOtRMTTY8G0Ig==
debug@^4.1.0, debug@^4.1.1:
@ -1251,7 +1248,7 @@ eyes@^0.1.8:
resolved "https://registry.npmjs.org/eyes/-/eyes-0.1.8.tgz"
integrity sha512-GipyPsXO1anza0AOZdy69Im7hGFCNB7Y/NGjDlZGJ3GJJLtwNSb2vrzYrTYJRrRloVx7pl+bhUaTB8yiccPvFQ==
fast-json-stable-stringify@2.x, fast-json-stable-stringify@^2.1.0:
fast-json-stable-stringify@^2.1.0, fast-json-stable-stringify@2.x:
version "2.1.0"
resolved "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz"
integrity sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==
@ -1293,11 +1290,6 @@ fs.realpath@^1.0.0:
resolved "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz"
integrity sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==
fsevents@^2.3.2:
version "2.3.2"
resolved "https://registry.yarnpkg.com/fsevents/-/fsevents-2.3.2.tgz#8a526f78b8fdf4623b709e0b975c52c24c02fd1a"
integrity sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==
function-bind@^1.1.1:
version "1.1.1"
resolved "https://registry.npmjs.org/function-bind/-/function-bind-1.1.1.tgz"
@ -1502,13 +1494,13 @@ jayson@^3.4.4:
"@types/connect" "^3.4.33"
"@types/node" "^12.12.54"
"@types/ws" "^7.4.4"
JSONStream "^1.3.5"
commander "^2.20.3"
delay "^5.0.0"
es6-promisify "^5.0.0"
eyes "^0.1.8"
isomorphic-ws "^4.0.1"
json-stringify-safe "^5.0.1"
JSONStream "^1.3.5"
lodash "^4.17.20"
uuid "^8.3.2"
ws "^7.4.5"
@ -1716,7 +1708,7 @@ jest-resolve-dependencies@^29.3.1:
jest-regex-util "^29.2.0"
jest-snapshot "^29.3.1"
jest-resolve@^29.3.1:
jest-resolve@*, jest-resolve@^29.3.1:
version "29.3.1"
resolved "https://registry.npmjs.org/jest-resolve/-/jest-resolve-29.3.1.tgz"
integrity sha512-amXJgH/Ng712w3Uz5gqzFBBjxV8WFLSmNjoreBGMqxgCz5cH7swmBZzgBaCIOsvb0NbpJ0vgaSFdJqMdT+rADw==
@ -1864,7 +1856,7 @@ jest-worker@^29.3.1:
merge-stream "^2.0.0"
supports-color "^8.0.0"
jest@^29.3.1:
jest@^29.0.0, jest@^29.3.1:
version "29.3.1"
resolved "https://registry.npmjs.org/jest/-/jest-29.3.1.tgz"
integrity sha512-6iWfL5DTT0Np6UYs/y5Niu7WIfNv/wRTtN5RSXt2DIEft3dx3zPuw/3WJQBCJfmEzvDiEKwoqMbGD9n49+qLSA==
@ -1912,6 +1904,14 @@ jsonparse@^1.2.0:
resolved "https://registry.npmjs.org/jsonparse/-/jsonparse-1.3.1.tgz"
integrity sha512-POQXvpdL69+CluYsillJ7SUhKvytYjW9vG/GKpnf+xP8UWgYEM/RaMzHHofbALDiKbbP1W8UEYmgGl39WkPZsg==
JSONStream@^1.3.5:
version "1.3.5"
resolved "https://registry.npmjs.org/JSONStream/-/JSONStream-1.3.5.tgz"
integrity sha512-E+iruNOY8VV9s4JEbe1aNEm6MiszPRr/UfcHMz0TQh1BXSxHK+ASV1R6W4HpjBhSeS+54PIsAMCBmwD06LLsqQ==
dependencies:
jsonparse "^1.2.0"
through ">=2.2.7 <3"
kleur@^3.0.3:
version "3.0.3"
resolved "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz"
@ -2002,7 +2002,7 @@ minimatch@^3.0.4, minimatch@^3.1.1:
dependencies:
brace-expansion "^1.1.7"
ms@2.1.2, ms@^2.0.0:
ms@^2.0.0, ms@2.1.2:
version "2.1.2"
resolved "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz"
integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==
@ -2214,17 +2214,24 @@ safe-buffer@^5.0.1:
resolved "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz"
integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==
semver@7.x, semver@^7.3.5:
semver@^6.0.0, semver@^6.3.0:
version "6.3.0"
resolved "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz"
integrity sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==
semver@^7.3.5:
version "7.3.8"
resolved "https://registry.npmjs.org/semver/-/semver-7.3.8.tgz"
integrity sha512-NB1ctGL5rlHrPJtFDVIVzTyQylMLu9N9VICA6HSFJo8MCGVTMW6gfpicwKmmK/dAjTOrqu5l63JJOpDSrAis3A==
dependencies:
lru-cache "^6.0.0"
semver@^6.0.0, semver@^6.3.0:
version "6.3.0"
resolved "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz"
integrity sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==
semver@7.x:
version "7.3.8"
resolved "https://registry.npmjs.org/semver/-/semver-7.3.8.tgz"
integrity sha512-NB1ctGL5rlHrPJtFDVIVzTyQylMLu9N9VICA6HSFJo8MCGVTMW6gfpicwKmmK/dAjTOrqu5l63JJOpDSrAis3A==
dependencies:
lru-cache "^6.0.0"
shebang-command@^2.0.0:
version "2.0.0"
@ -2413,10 +2420,10 @@ type-fest@^0.21.3:
resolved "https://registry.npmjs.org/type-fest/-/type-fest-0.21.3.tgz"
integrity sha512-t0rzBq87m3fVcduHDUFhKmyyX+9eo6WQjZvf51Ea/M0Q7+T374Jp1aUiyUl0GKxp8M/OETVHSDvmkyPgvX+X2w==
typescript@^4.9.4:
version "4.9.4"
resolved "https://registry.npmjs.org/typescript/-/typescript-4.9.4.tgz"
integrity sha512-Uz+dTXYzxXXbsFpM86Wh3dKCxrQqUcVMxwU54orwlJjOpO3ao8L7j5lH+dWfTwgCwIuM9GQ2kvVotzYJMXTBZg==
typescript@^4.9.5, typescript@>=4.3:
version "4.9.5"
resolved "https://registry.npmjs.org/typescript/-/typescript-4.9.5.tgz"
integrity sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==
update-browserslist-db@^1.0.9:
version "1.0.10"
@ -2426,7 +2433,7 @@ update-browserslist-db@^1.0.9:
escalade "^3.1.1"
picocolors "^1.0.0"
utf-8-validate@^5.0.2:
utf-8-validate@^5.0.2, utf-8-validate@>=5.0.2:
version "5.0.10"
resolved "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.10.tgz"
integrity sha512-Z6czzLq4u8fPOyx7TU6X3dvUZVvoJmxSQ+IcrlmagKhilxlhZgxPK6C5Jqbkw1IDUmFTM+cz9QDnnLTwDz/2gQ==
@ -2496,7 +2503,7 @@ write-file-atomic@^4.0.1:
imurmurhash "^0.1.4"
signal-exit "^3.0.7"
ws@^7.4.5:
ws@*, ws@^7.4.5:
version "7.5.9"
resolved "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==