diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index 985a3a74..6ae03eec 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -490,6 +490,7 @@ impl LiteRpcServer for LiteBridge { slot: Option, config: Option, ) -> crate::rpc::Result>>> { + log::warn!("receive get_leader_schedule rpc call"); //TODO verify leader identity. let schedule = self .data_cache diff --git a/stake_vote/src/leader_schedule.rs b/stake_vote/src/leader_schedule.rs index 3fe756dc..04e391f4 100644 --- a/stake_vote/src/leader_schedule.rs +++ b/stake_vote/src/leader_schedule.rs @@ -304,7 +304,6 @@ pub fn calculate_leader_schedule( let mut seed = [0u8; 32]; seed[0..8].copy_from_slice(&epoch.to_le_bytes()); sort_stakes(&mut stakes); - log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}"); LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS) } diff --git a/stake_vote/src/lib.rs b/stake_vote/src/lib.rs index 55bca52a..e22012f3 100644 --- a/stake_vote/src/lib.rs +++ b/stake_vote/src/lib.rs @@ -3,11 +3,13 @@ use crate::bootstrap::BootstrapEvent; use futures::Stream; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; +use solana_lite_rpc_core::stores::block_information_store::BlockInformation; use solana_lite_rpc_core::stores::data_cache::DataCache; use solana_lite_rpc_core::structures::leaderschedule::GetVoteAccountsConfig; use solana_lite_rpc_core::types::SlotStream; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::response::RpcVoteAccountStatus; +use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; use std::collections::HashMap; use std::sync::Arc; @@ -92,8 +94,14 @@ pub async fn start_stakes_and_votes_loop( } } Some((config, return_channel)) = vote_account_rpc_request.recv() => { - let current_slot = solana_lite_rpc_core::solana_utils::get_current_confirmed_slot(&data_cache).await; - rpc_request_processor.process_get_vote_accounts(current_slot, config, return_channel, &mut votestore).await; + let commitment = config.commitment.unwrap_or(CommitmentConfig::confirmed()); + let BlockInformation { slot, .. } = data_cache + .block_information_store + .get_latest_block(commitment) + .await; + + let current_epoch = data_cache.get_current_epoch(commitment).await; + rpc_request_processor.process_get_vote_accounts(slot, current_epoch.epoch, config, return_channel, &mut votestore).await; } //manage rpc waiting request notification. Some(Ok((votes, vote_accounts, rpc_vote_accounts))) = rpc_request_processor.rpc_exec_task.next() => { @@ -105,8 +113,8 @@ pub async fn start_stakes_and_votes_loop( ).await; } //manage rpc waiting request notification. - Some(Ok((current_slot, config))) = rpc_request_processor.rpc_notify_task.next() => { - rpc_request_processor.take_vote_accounts_and_process(&mut votestore, current_slot, config).await; + Some(Ok((current_slot, epoch, config))) = rpc_request_processor.rpc_notify_task.next() => { + rpc_request_processor.take_vote_accounts_and_process(&mut votestore, current_slot, epoch, config).await; } //manage geyser stake_history notification ret = stake_history_geyzer_stream.next() => { diff --git a/stake_vote/src/rpcrequest.rs b/stake_vote/src/rpcrequest.rs index cc752e5e..e79b1b2a 100644 --- a/stake_vote/src/rpcrequest.rs +++ b/stake_vote/src/rpcrequest.rs @@ -11,7 +11,7 @@ use tokio::sync::oneshot; use tokio::task::JoinHandle; pub struct RpcRequestData { - pub rpc_notify_task: FuturesUnordered>, + pub rpc_notify_task: FuturesUnordered>, pub rpc_exec_task: FuturesUnordered>, pending_rpc_request: Option>>, @@ -29,6 +29,7 @@ impl RpcRequestData { pub async fn process_get_vote_accounts( &mut self, current_slot: Slot, + epoch: u64, config: GetVoteAccountsConfig, return_channel: oneshot::Sender, votestore: &mut VoteStore, @@ -39,7 +40,7 @@ impl RpcRequestData { self.pending_rpc_request = Some(vec![return_channel]); } } - self.take_vote_accounts_and_process(votestore, current_slot, config) + self.take_vote_accounts_and_process(votestore, current_slot, epoch, config) .await; } pub async fn notify_end_rpc_get_vote_accounts( @@ -50,14 +51,15 @@ impl RpcRequestData { votestore: &mut VoteStore, ) { if let Err(err) = votestore.votes.merge((votes, vote_accounts)) { - log::info!("Error during RPC get vote account merge:{err}"); + log::error!("Error during RPC get vote account merge:{err}"); } - //avoid clone on the first request - if let Some(ref mut pending_rpc_request) = self.pending_rpc_request { - for return_channel in pending_rpc_request.drain(0..pending_rpc_request.len() - 1) { - if return_channel.send(rpc_vote_accounts.clone()).is_err() { - log::error!("Vote accounts RPC channel send closed."); + if let Some(mut pending_rpc_request) = self.pending_rpc_request.take() { + if pending_rpc_request.len() > 1 { + for return_channel in pending_rpc_request.drain(0..pending_rpc_request.len() - 1) { + if return_channel.send(rpc_vote_accounts.clone()).is_err() { + log::error!("Vote accounts RPC channel send closed."); + } } } if pending_rpc_request @@ -75,30 +77,41 @@ impl RpcRequestData { &mut self, votestore: &mut VoteStore, current_slot: Slot, + epoch: u64, config: GetVoteAccountsConfig, ) { - if let Some(((votes, vote_accounts), (current_slot, config))) = + if let Some(((votes, vote_accounts), (current_slot, epoch, config))) = wait_for_merge_or_get_content( &mut votestore.votes, - (current_slot, config), + (current_slot, epoch, config), &mut self.rpc_notify_task, ) .await { + //validate that we have the epoch. + let jh = tokio::task::spawn_blocking({ - move || { - let rpc_vote_accounts = crate::vote::get_rpc_vote_accounts_info( - current_slot, - &votes, - //TODO manage missing epoch and return error. - &vote_accounts - .vote_stakes_for_epoch(0) - .as_ref() - .unwrap() - .vote_stakes, - config, - ); - (votes, vote_accounts, rpc_vote_accounts) + move || match vote_accounts.vote_stakes_for_epoch(epoch) { + Some(stakes) => { + let rpc_vote_accounts = crate::vote::get_rpc_vote_accounts_info( + current_slot, + &votes, + &stakes.vote_stakes, + config, + ); + (votes, vote_accounts, rpc_vote_accounts) + } + None => { + log::warn!("Get vote account for epoch:{epoch}. No data available"); + ( + votes, + vote_accounts, + RpcVoteAccountStatus { + current: vec![], + delinquent: vec![], + }, + ) + } } }); self.rpc_exec_task.push(jh); diff --git a/stake_vote/src/utils.rs b/stake_vote/src/utils.rs index 94553c85..831236e7 100644 --- a/stake_vote/src/utils.rs +++ b/stake_vote/src/utils.rs @@ -122,7 +122,7 @@ impl<'a, T, C: TakableContent> Takable for &'a mut TakableMap { content.add_value(val); } self.content = Some(content); - self.notifier.notify_waiters(); + self.notifier.notify_one(); Ok(()) } else { bail!("TakableMap with a existing content".to_string()) @@ -232,8 +232,8 @@ impl + Default> TakableMap { mod tests { use super::*; - #[test] - fn test_takable_struct() { + #[tokio::test] + async fn test_takable_struct() { impl TakableContent for Vec { fn add_value(&mut self, val: UpdateAction) { match val { @@ -254,23 +254,35 @@ mod tests { let take_content = (&mut takable).take(); assert_take_content_map(&take_content, 1); - assert_eq!(takable.updates.len(), 1); - let take_content = (&mut takable).take(); - assert_take_content_taken(&take_content); - assert!(takable.content.is_none()); - assert_eq!(takable.updates.len(), 1); - takable.add_value(UpdateAction::Notify(25, 0), false); - assert_eq!(takable.updates.len(), 2); let content = match take_content { TakeResult::Taken(_) => panic!("not a content"), TakeResult::Map(content) => content, }; + assert_eq!(takable.updates.len(), 1); + let take_content = (&mut takable).take(); + assert_take_content_taken(&take_content); + let notifier = match take_content { + TakeResult::Taken(notifier) => notifier, + TakeResult::Map(_) => panic!("not a notifier"), + }; + assert_eq!(notifier.len(), 1); + let notif_jh = tokio::spawn(async move { + notifier[0].as_ref().notified().await; + }); + + assert!(takable.content.is_none()); + assert_eq!(takable.updates.len(), 1); + takable.add_value(UpdateAction::Notify(25, 0), false); + assert_eq!(takable.updates.len(), 2); takable.merge(content).unwrap(); assert_eq!(takable.content.as_ref().unwrap().len(), 3); assert_eq!(takable.updates.len(), 0); - //merge(&mut takable, vec![]); - //assert!(err.is_err()); + //wait for notifier + if let Err(_) = tokio::time::timeout(std::time::Duration::from_millis(1000), notif_jh).await + { + assert!(false, "take notifier timeout"); + } } fn assert_take_content_map(take_content: &TakeResult>, len: usize) { diff --git a/stake_vote/src/vote.rs b/stake_vote/src/vote.rs index e119e61c..08f22161 100644 --- a/stake_vote/src/vote.rs +++ b/stake_vote/src/vote.rs @@ -31,11 +31,12 @@ pub struct EpochVoteStakesCache { } impl EpochVoteStakesCache { - pub fn vote_stakes_for_epoch(&self, epoch: u64) -> Option { - self.cache.get(&epoch).cloned() + pub fn vote_stakes_for_epoch(&self, epoch: u64) -> Option<&EpochVoteStakes> { + self.cache.get(&epoch) } pub fn add_stakes_for_epoch(&mut self, vote_stakes: EpochVoteStakes) { + log::info!("add_stakes_for_epoch :{}", vote_stakes.epoch); if self.cache.insert(vote_stakes.epoch, vote_stakes).is_some() { log::warn!("Override existing vote stake epoch cache for epoch:"); }