Store and resend votes if leader's TPU port is unknown (#2438)

* Store and resend votes if leader's TPU port is unknown

* fix build errors

* fix failing tests
This commit is contained in:
Pankaj Garg 2019-01-16 06:14:55 -08:00 committed by GitHub
parent 3282cb85ae
commit 03d4d1cb36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 98 additions and 18 deletions

View File

@ -78,6 +78,8 @@ pub struct VoteSignerProxy {
keypair: Arc<Keypair>,
signer: Box<VoteSigner + Send + Sync>,
pub vote_account: Pubkey,
last_leader: RwLock<Pubkey>,
unsent_votes: RwLock<Vec<Transaction>>,
}
impl VoteSignerProxy {
@ -91,6 +93,8 @@ impl VoteSignerProxy {
keypair: keypair.clone(),
signer,
vote_account,
last_leader: RwLock::new(vote_account),
unsent_votes: RwLock::new(vec![]),
}
}
@ -108,13 +112,44 @@ impl VoteSignerProxy {
cluster_info: &Arc<RwLock<ClusterInfo>>,
vote_blob_sender: &BlobSender,
) -> Result<()> {
let last_id = bank.last_id();
{
let (leader, _) = bank.get_current_leader().unwrap();
if let Ok(shared_blob) = self.new_signed_vote_blob(&last_id, bank, cluster_info) {
inc_new_counter_info!("validator-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob])?;
let mut old_leader = self.last_leader.write().unwrap();
if leader != *old_leader {
*old_leader = leader;
self.unsent_votes.write().unwrap().clear();
}
inc_new_counter_info!(
"validator-total_pending_votes",
self.unsent_votes.read().unwrap().len()
);
}
let tx = self.new_signed_vote_transaction(&bank.last_id(), bank.tick_height());
match VoteSignerProxy::get_leader_tpu(&bank, cluster_info) {
Ok(tpu) => {
self.unsent_votes.write().unwrap().retain(|old_tx| {
if let Ok(shared_blob) = self.new_signed_vote_blob(old_tx, tpu) {
inc_new_counter_info!("validator-pending_vote_sent", 1);
inc_new_counter_info!("validator-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob]).unwrap();
}
false
});
if let Ok(shared_blob) = self.new_signed_vote_blob(&tx, tpu) {
inc_new_counter_info!("validator-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob])?;
}
}
Err(_) => {
self.unsent_votes.write().unwrap().push(tx);
inc_new_counter_info!("validator-new_pending_vote", 1);
}
};
Ok(())
}
@ -137,21 +172,8 @@ impl VoteSignerProxy {
}
}
// TODO: Change voting to be on fixed tick intervals based on bank state
fn new_signed_vote_blob(
&self,
last_id: &Hash,
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Result<SharedBlob> {
fn new_signed_vote_blob(&self, tx: &Transaction, leader_tpu: SocketAddr) -> Result<SharedBlob> {
let shared_blob = SharedBlob::default();
let tick_height = bank.tick_height();
let leader_tpu = VoteSignerProxy::get_leader_tpu(&bank, cluster_info)?;
//TODO: doesn't seem like there is a synchronous call to get height and id
debug!("voting on {:?}", &last_id.as_ref()[..8]);
let tx = self.new_signed_vote_transaction(last_id, tick_height);
{
let mut blob = shared_blob.write().unwrap();
let bytes = serialize(&tx)?;
@ -179,3 +201,61 @@ impl VoteSignerProxy {
}
}
}
#[cfg(test)]
mod test {
use crate::bank::Bank;
use crate::cluster_info::{ClusterInfo, Node};
use crate::mint::Mint;
use crate::vote_signer_proxy::VoteSignerProxy;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_vote_signer::rpc::LocalVoteSigner;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
#[test]
pub fn test_pending_votes() {
solana_logger::setup();
let signer = VoteSignerProxy::new(
&Arc::new(Keypair::new()),
Box::new(LocalVoteSigner::default()),
);
// Set up dummy node to host a ReplayStage
let my_keypair = Keypair::new();
let my_id = my_keypair.pubkey();
let my_node = Node::new_localhost_with_pubkey(my_id);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
let mint = Mint::new_with_leader(10000, my_id, 500);
let bank = Arc::new(Bank::new(&mint));
let (sender, receiver) = channel();
assert_eq!(signer.unsent_votes.read().unwrap().len(), 0);
signer
.send_validator_vote(&bank, &cluster_info, &sender)
.unwrap();
assert_eq!(signer.unsent_votes.read().unwrap().len(), 1);
assert!(receiver.recv_timeout(Duration::from_millis(400)).is_err());
signer
.send_validator_vote(&bank, &cluster_info, &sender)
.unwrap();
assert_eq!(signer.unsent_votes.read().unwrap().len(), 2);
assert!(receiver.recv_timeout(Duration::from_millis(400)).is_err());
bank.leader_scheduler
.write()
.unwrap()
.use_only_bootstrap_leader = true;
bank.leader_scheduler.write().unwrap().bootstrap_leader = my_id;
assert!(signer
.send_validator_vote(&bank, &cluster_info, &sender)
.is_ok());
assert!(receiver.recv_timeout(Duration::from_millis(400)).is_ok());
assert_eq!(signer.unsent_votes.read().unwrap().len(), 0);
}
}