From 03d4d1cb367fb82ac38f2cd4ee240defb4290f9d Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 16 Jan 2019 06:14:55 -0800 Subject: [PATCH] Store and resend votes if leader's TPU port is unknown (#2438) * Store and resend votes if leader's TPU port is unknown * fix build errors * fix failing tests --- src/vote_signer_proxy.rs | 116 +++++++++++++++++++++++++++++++++------ 1 file changed, 98 insertions(+), 18 deletions(-) diff --git a/src/vote_signer_proxy.rs b/src/vote_signer_proxy.rs index 0cea818040..ad6b5a926b 100644 --- a/src/vote_signer_proxy.rs +++ b/src/vote_signer_proxy.rs @@ -78,6 +78,8 @@ pub struct VoteSignerProxy { keypair: Arc, signer: Box, pub vote_account: Pubkey, + last_leader: RwLock, + unsent_votes: RwLock>, } impl VoteSignerProxy { @@ -91,6 +93,8 @@ impl VoteSignerProxy { keypair: keypair.clone(), signer, vote_account, + last_leader: RwLock::new(vote_account), + unsent_votes: RwLock::new(vec![]), } } @@ -108,13 +112,44 @@ impl VoteSignerProxy { cluster_info: &Arc>, vote_blob_sender: &BlobSender, ) -> Result<()> { - let last_id = bank.last_id(); + { + let (leader, _) = bank.get_current_leader().unwrap(); - if let Ok(shared_blob) = self.new_signed_vote_blob(&last_id, bank, cluster_info) { - inc_new_counter_info!("validator-vote_sent", 1); - vote_blob_sender.send(vec![shared_blob])?; + let mut old_leader = self.last_leader.write().unwrap(); + + if leader != *old_leader { + *old_leader = leader; + self.unsent_votes.write().unwrap().clear(); + } + inc_new_counter_info!( + "validator-total_pending_votes", + self.unsent_votes.read().unwrap().len() + ); } + let tx = self.new_signed_vote_transaction(&bank.last_id(), bank.tick_height()); + + match VoteSignerProxy::get_leader_tpu(&bank, cluster_info) { + Ok(tpu) => { + self.unsent_votes.write().unwrap().retain(|old_tx| { + if let Ok(shared_blob) = self.new_signed_vote_blob(old_tx, tpu) { + inc_new_counter_info!("validator-pending_vote_sent", 1); + inc_new_counter_info!("validator-vote_sent", 1); + vote_blob_sender.send(vec![shared_blob]).unwrap(); + } + false + }); + if let Ok(shared_blob) = self.new_signed_vote_blob(&tx, tpu) { + inc_new_counter_info!("validator-vote_sent", 1); + vote_blob_sender.send(vec![shared_blob])?; + } + } + Err(_) => { + self.unsent_votes.write().unwrap().push(tx); + inc_new_counter_info!("validator-new_pending_vote", 1); + } + }; + Ok(()) } @@ -137,21 +172,8 @@ impl VoteSignerProxy { } } - // TODO: Change voting to be on fixed tick intervals based on bank state - fn new_signed_vote_blob( - &self, - last_id: &Hash, - bank: &Arc, - cluster_info: &Arc>, - ) -> Result { + fn new_signed_vote_blob(&self, tx: &Transaction, leader_tpu: SocketAddr) -> Result { let shared_blob = SharedBlob::default(); - let tick_height = bank.tick_height(); - - let leader_tpu = VoteSignerProxy::get_leader_tpu(&bank, cluster_info)?; - //TODO: doesn't seem like there is a synchronous call to get height and id - debug!("voting on {:?}", &last_id.as_ref()[..8]); - let tx = self.new_signed_vote_transaction(last_id, tick_height); - { let mut blob = shared_blob.write().unwrap(); let bytes = serialize(&tx)?; @@ -179,3 +201,61 @@ impl VoteSignerProxy { } } } + +#[cfg(test)] +mod test { + use crate::bank::Bank; + use crate::cluster_info::{ClusterInfo, Node}; + use crate::mint::Mint; + use crate::vote_signer_proxy::VoteSignerProxy; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_vote_signer::rpc::LocalVoteSigner; + use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; + use std::time::Duration; + + #[test] + pub fn test_pending_votes() { + solana_logger::setup(); + + let signer = VoteSignerProxy::new( + &Arc::new(Keypair::new()), + Box::new(LocalVoteSigner::default()), + ); + + // Set up dummy node to host a ReplayStage + let my_keypair = Keypair::new(); + let my_id = my_keypair.pubkey(); + let my_node = Node::new_localhost_with_pubkey(my_id); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); + + let mint = Mint::new_with_leader(10000, my_id, 500); + let bank = Arc::new(Bank::new(&mint)); + let (sender, receiver) = channel(); + + assert_eq!(signer.unsent_votes.read().unwrap().len(), 0); + signer + .send_validator_vote(&bank, &cluster_info, &sender) + .unwrap(); + assert_eq!(signer.unsent_votes.read().unwrap().len(), 1); + assert!(receiver.recv_timeout(Duration::from_millis(400)).is_err()); + + signer + .send_validator_vote(&bank, &cluster_info, &sender) + .unwrap(); + assert_eq!(signer.unsent_votes.read().unwrap().len(), 2); + assert!(receiver.recv_timeout(Duration::from_millis(400)).is_err()); + + bank.leader_scheduler + .write() + .unwrap() + .use_only_bootstrap_leader = true; + bank.leader_scheduler.write().unwrap().bootstrap_leader = my_id; + assert!(signer + .send_validator_vote(&bank, &cluster_info, &sender) + .is_ok()); + assert!(receiver.recv_timeout(Duration::from_millis(400)).is_ok()); + + assert_eq!(signer.unsent_votes.read().unwrap().len(), 0); + } +}