test getvoteaccount. Correct some bugs

This commit is contained in:
musitdev 2023-11-09 16:37:37 +01:00
parent 901b76c4e0
commit 5c64a151c2
6 changed files with 76 additions and 42 deletions

View File

@ -490,6 +490,7 @@ impl LiteRpcServer for LiteBridge {
slot: Option<u64>,
config: Option<RpcLeaderScheduleConfig>,
) -> crate::rpc::Result<Option<HashMap<String, Vec<usize>>>> {
log::warn!("receive get_leader_schedule rpc call");
//TODO verify leader identity.
let schedule = self
.data_cache

View File

@ -304,7 +304,6 @@ pub fn calculate_leader_schedule(
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch.to_le_bytes());
sort_stakes(&mut stakes);
log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}");
LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS)
}

View File

@ -3,11 +3,13 @@ use crate::bootstrap::BootstrapEvent;
use futures::Stream;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use solana_lite_rpc_core::stores::block_information_store::BlockInformation;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig;
use solana_lite_rpc_core::types::SlotStream;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::response::RpcVoteAccountStatus;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::sync::Arc;
@ -92,8 +94,14 @@ pub async fn start_stakes_and_votes_loop(
}
}
Some((config, return_channel)) = vote_account_rpc_request.recv() => {
let current_slot = solana_lite_rpc_core::solana_utils::get_current_confirmed_slot(&data_cache).await;
rpc_request_processor.process_get_vote_accounts(current_slot, config, return_channel, &mut votestore).await;
let commitment = config.commitment.unwrap_or(CommitmentConfig::confirmed());
let BlockInformation { slot, .. } = data_cache
.block_information_store
.get_latest_block(commitment)
.await;
let current_epoch = data_cache.get_current_epoch(commitment).await;
rpc_request_processor.process_get_vote_accounts(slot, current_epoch.epoch, config, return_channel, &mut votestore).await;
}
//manage rpc waiting request notification.
Some(Ok((votes, vote_accounts, rpc_vote_accounts))) = rpc_request_processor.rpc_exec_task.next() => {
@ -105,8 +113,8 @@ pub async fn start_stakes_and_votes_loop(
).await;
}
//manage rpc waiting request notification.
Some(Ok((current_slot, config))) = rpc_request_processor.rpc_notify_task.next() => {
rpc_request_processor.take_vote_accounts_and_process(&mut votestore, current_slot, config).await;
Some(Ok((current_slot, epoch, config))) = rpc_request_processor.rpc_notify_task.next() => {
rpc_request_processor.take_vote_accounts_and_process(&mut votestore, current_slot, epoch, config).await;
}
//manage geyser stake_history notification
ret = stake_history_geyzer_stream.next() => {

View File

@ -11,7 +11,7 @@ use tokio::sync::oneshot;
use tokio::task::JoinHandle;
pub struct RpcRequestData {
pub rpc_notify_task: FuturesUnordered<JoinHandle<(u64, GetVoteAccountsConfig)>>,
pub rpc_notify_task: FuturesUnordered<JoinHandle<(u64, u64, GetVoteAccountsConfig)>>,
pub rpc_exec_task:
FuturesUnordered<JoinHandle<(VoteMap, EpochVoteStakesCache, RpcVoteAccountStatus)>>,
pending_rpc_request: Option<Vec<oneshot::Sender<RpcVoteAccountStatus>>>,
@ -29,6 +29,7 @@ impl RpcRequestData {
pub async fn process_get_vote_accounts(
&mut self,
current_slot: Slot,
epoch: u64,
config: GetVoteAccountsConfig,
return_channel: oneshot::Sender<RpcVoteAccountStatus>,
votestore: &mut VoteStore,
@ -39,7 +40,7 @@ impl RpcRequestData {
self.pending_rpc_request = Some(vec![return_channel]);
}
}
self.take_vote_accounts_and_process(votestore, current_slot, config)
self.take_vote_accounts_and_process(votestore, current_slot, epoch, config)
.await;
}
pub async fn notify_end_rpc_get_vote_accounts(
@ -50,14 +51,15 @@ impl RpcRequestData {
votestore: &mut VoteStore,
) {
if let Err(err) = votestore.votes.merge((votes, vote_accounts)) {
log::info!("Error during RPC get vote account merge:{err}");
log::error!("Error during RPC get vote account merge:{err}");
}
//avoid clone on the first request
if let Some(ref mut pending_rpc_request) = self.pending_rpc_request {
for return_channel in pending_rpc_request.drain(0..pending_rpc_request.len() - 1) {
if return_channel.send(rpc_vote_accounts.clone()).is_err() {
log::error!("Vote accounts RPC channel send closed.");
if let Some(mut pending_rpc_request) = self.pending_rpc_request.take() {
if pending_rpc_request.len() > 1 {
for return_channel in pending_rpc_request.drain(0..pending_rpc_request.len() - 1) {
if return_channel.send(rpc_vote_accounts.clone()).is_err() {
log::error!("Vote accounts RPC channel send closed.");
}
}
}
if pending_rpc_request
@ -75,30 +77,41 @@ impl RpcRequestData {
&mut self,
votestore: &mut VoteStore,
current_slot: Slot,
epoch: u64,
config: GetVoteAccountsConfig,
) {
if let Some(((votes, vote_accounts), (current_slot, config))) =
if let Some(((votes, vote_accounts), (current_slot, epoch, config))) =
wait_for_merge_or_get_content(
&mut votestore.votes,
(current_slot, config),
(current_slot, epoch, config),
&mut self.rpc_notify_task,
)
.await
{
//validate that we have the epoch.
let jh = tokio::task::spawn_blocking({
move || {
let rpc_vote_accounts = crate::vote::get_rpc_vote_accounts_info(
current_slot,
&votes,
//TODO manage missing epoch and return error.
&vote_accounts
.vote_stakes_for_epoch(0)
.as_ref()
.unwrap()
.vote_stakes,
config,
);
(votes, vote_accounts, rpc_vote_accounts)
move || match vote_accounts.vote_stakes_for_epoch(epoch) {
Some(stakes) => {
let rpc_vote_accounts = crate::vote::get_rpc_vote_accounts_info(
current_slot,
&votes,
&stakes.vote_stakes,
config,
);
(votes, vote_accounts, rpc_vote_accounts)
}
None => {
log::warn!("Get vote account for epoch:{epoch}. No data available");
(
votes,
vote_accounts,
RpcVoteAccountStatus {
current: vec![],
delinquent: vec![],
},
)
}
}
});
self.rpc_exec_task.push(jh);

View File

@ -122,7 +122,7 @@ impl<'a, T, C: TakableContent<T>> Takable<C> for &'a mut TakableMap<T, C> {
content.add_value(val);
}
self.content = Some(content);
self.notifier.notify_waiters();
self.notifier.notify_one();
Ok(())
} else {
bail!("TakableMap with a existing content".to_string())
@ -232,8 +232,8 @@ impl<T: Default, C: TakableContent<T> + Default> TakableMap<T, C> {
mod tests {
use super::*;
#[test]
fn test_takable_struct() {
#[tokio::test]
async fn test_takable_struct() {
impl TakableContent<u64> for Vec<u64> {
fn add_value(&mut self, val: UpdateAction<u64>) {
match val {
@ -254,23 +254,35 @@ mod tests {
let take_content = (&mut takable).take();
assert_take_content_map(&take_content, 1);
assert_eq!(takable.updates.len(), 1);
let take_content = (&mut takable).take();
assert_take_content_taken(&take_content);
assert!(takable.content.is_none());
assert_eq!(takable.updates.len(), 1);
takable.add_value(UpdateAction::Notify(25, 0), false);
assert_eq!(takable.updates.len(), 2);
let content = match take_content {
TakeResult::Taken(_) => panic!("not a content"),
TakeResult::Map(content) => content,
};
assert_eq!(takable.updates.len(), 1);
let take_content = (&mut takable).take();
assert_take_content_taken(&take_content);
let notifier = match take_content {
TakeResult::Taken(notifier) => notifier,
TakeResult::Map(_) => panic!("not a notifier"),
};
assert_eq!(notifier.len(), 1);
let notif_jh = tokio::spawn(async move {
notifier[0].as_ref().notified().await;
});
assert!(takable.content.is_none());
assert_eq!(takable.updates.len(), 1);
takable.add_value(UpdateAction::Notify(25, 0), false);
assert_eq!(takable.updates.len(), 2);
takable.merge(content).unwrap();
assert_eq!(takable.content.as_ref().unwrap().len(), 3);
assert_eq!(takable.updates.len(), 0);
//merge(&mut takable, vec![]);
//assert!(err.is_err());
//wait for notifier
if let Err(_) = tokio::time::timeout(std::time::Duration::from_millis(1000), notif_jh).await
{
assert!(false, "take notifier timeout");
}
}
fn assert_take_content_map(take_content: &TakeResult<Vec<u64>>, len: usize) {

View File

@ -31,11 +31,12 @@ pub struct EpochVoteStakesCache {
}
impl EpochVoteStakesCache {
pub fn vote_stakes_for_epoch(&self, epoch: u64) -> Option<EpochVoteStakes> {
self.cache.get(&epoch).cloned()
pub fn vote_stakes_for_epoch(&self, epoch: u64) -> Option<&EpochVoteStakes> {
self.cache.get(&epoch)
}
pub fn add_stakes_for_epoch(&mut self, vote_stakes: EpochVoteStakes) {
log::info!("add_stakes_for_epoch :{}", vote_stakes.epoch);
if self.cache.insert(vote_stakes.epoch, vote_stakes).is_some() {
log::warn!("Override existing vote stake epoch cache for epoch:");
}