Vote every number of ticks (#2141)

* Vote every number of ticks

* address review comments

* fix for failing leader rotation tests

* remove check for vote failure from replay tests
(as votes will be cached and transmitted when leader is available)
This commit is contained in:
Pankaj Garg 2018-12-13 18:43:10 -08:00 committed by GitHub
parent 85398c728a
commit 091b21fae7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 44 deletions

View File

@ -21,7 +21,7 @@ pub enum FinalityError {
NoValidSupermajority,
}
pub const COMPUTE_FINALITY_MS: u64 = 1000;
pub const COMPUTE_FINALITY_MS: u64 = 100;
pub struct ComputeLeaderFinalityService {
compute_finality_thread: JoinHandle<()>,

View File

@ -25,6 +25,9 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
pub const BLOCK_TICK_COUNT: u64 = 8;
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ReplayStageReturnType {
LeaderRotation(u64, u64, Hash),
@ -71,6 +74,9 @@ impl ReplayStage {
let mut entries = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() {
entries.append(&mut more);
if entries.len() >= MAX_ENTRY_RECV_PER_ITER {
break;
}
}
submit(
@ -94,9 +100,25 @@ impl ReplayStage {
let (current_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
let my_id = keypair.pubkey();
for (i, entry) in entries.iter().enumerate() {
res = bank.process_entry(&entry);
let my_id = keypair.pubkey();
if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous
// transaction, instead of processing the rest of the entries in the vector
// of received entries. This is in line with previous behavior when
// bank.process_entries() was used to process the entries, but doesn't solve the
// issue that the bank state was still changed, leading to inconsistencies with the
// leader as the leader currently should not be publishing erroneous transactions
break;
}
if bank.tick_height() % BLOCK_TICK_COUNT == 0 {
if let Some(sender) = vote_blob_sender {
send_validator_vote(bank, vote_account_keypair, &cluster_info, sender).unwrap();
}
}
let (scheduled_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
@ -105,20 +127,11 @@ impl ReplayStage {
if scheduled_leader != current_leader {
cluster_info.write().unwrap().set_leader(scheduled_leader);
}
if my_id == scheduled_leader {
num_entries_to_write = i + 1;
break;
}
if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous
// transaction, instead of processing the rest of the entries in the vector
// of received entries. This is in line with previous behavior when
// bank.process_entries() was used to process the entries, but doesn't solve the
// issue that the bank state was still changed, leading to inconsistencies with the
// leader as the leader currently should not be publishing erroneous transactions
break;
}
}
// If leader rotation happened, only write the entries up to leader rotation.
@ -134,7 +147,6 @@ impl ReplayStage {
);
let entries_len = entries.len() as u64;
// TODO: move this to another stage?
// TODO: In line with previous behavior, this will write all the entries even if
// an error occurred processing one of the entries (causing the rest of the entries to
// not be processed).
@ -144,9 +156,10 @@ impl ReplayStage {
*entry_height += entries_len;
res?;
if let Some(sender) = vote_blob_sender {
send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?;
}
inc_new_counter_info!(
"replicate_stage-duration",
duration_as_ms(&now.elapsed()) as usize
);
Ok(())
}
@ -173,8 +186,6 @@ impl ReplayStage {
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit);
let now = Instant::now();
let mut next_vote_secs = 1;
let mut entry_height_ = entry_height;
let mut last_entry_id = last_entry_id;
loop {
@ -194,21 +205,13 @@ impl ReplayStage {
));
}
// Only vote once a second.
let vote_sender = if now.elapsed().as_secs() > next_vote_secs {
next_vote_secs += 1;
Some(&vote_blob_sender)
} else {
None
};
match Self::process_entries(
&bank,
&cluster_info,
&window_receiver,
&keypair,
&vote_account_keypair,
vote_sender,
Some(&vote_blob_sender),
&ledger_entry_sender,
&mut entry_height_,
&mut last_entry_id,
@ -258,7 +261,7 @@ mod test {
use crate::replay_stage::{ReplayStage, ReplayStageReturnType};
use crate::result::Error;
use crate::service::Service;
use crate::vote_stage::{send_validator_vote, VoteError};
use crate::vote_stage::send_validator_vote;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
@ -437,13 +440,8 @@ mod test {
// Vote sender should error because no leader contact info is found in the
// ClusterInfo
let (mock_sender, _mock_receiver) = channel();
let vote_err =
let _vote_err =
send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender);
if let Err(Error::VoteError(vote_error)) = vote_err {
assert_eq!(vote_error, VoteError::LeaderInfoNotFound);
} else {
panic!("Expected validator vote to fail with LeaderInfoNotFound");
}
// Send ReplayStage an entry, should see it on the ledger writer receiver
let next_tick = create_ticks(
@ -549,13 +547,8 @@ mod test {
// Vote sender should error because no leader contact info is found in the
// ClusterInfo
let (mock_sender, _mock_receiver) = channel();
let vote_err =
let _vote_err =
send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender);
if let Err(Error::VoteError(vote_error)) = vote_err {
assert_eq!(vote_error, VoteError::LeaderInfoNotFound);
} else {
panic!("Expected validator vote to fail with LeaderInfoNotFound");
}
// Send enough ticks to trigger leader rotation
let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize;

View File

@ -74,9 +74,10 @@ pub fn send_validator_vote(
) -> Result<()> {
let last_id = bank.last_id();
let shared_blob = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)?;
inc_new_counter_info!("validator-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob])?;
if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)
{
inc_new_counter_info!("validator-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob])?;
}
Ok(())
}