Fix possibility of a vote error breaking ledger (#1768)
* Fix possibility of a vote error breaking ledger * Add test
This commit is contained in:
parent
15c00ea2ef
commit
4d98da44e3
|
@ -61,7 +61,8 @@ impl ReplicateStage {
|
|||
vote_blob_sender: Option<&BlobSender>,
|
||||
ledger_entry_sender: &EntrySender,
|
||||
entry_height: &mut u64,
|
||||
) -> Result<Hash> {
|
||||
last_entry_id: &mut Option<Hash>,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
//coalesce all the available entries into a single vote
|
||||
let mut entries = window_receiver.recv_timeout(timer)?;
|
||||
|
@ -76,49 +77,45 @@ impl ReplicateStage {
|
|||
);
|
||||
|
||||
let mut res = Ok(());
|
||||
let last_entry_id = {
|
||||
let mut num_entries_to_write = entries.len();
|
||||
let (current_leader, _) = bank
|
||||
let mut num_entries_to_write = entries.len();
|
||||
let (current_leader, _) = bank
|
||||
.get_current_leader()
|
||||
.expect("Scheduled leader id should never be unknown while processing entries");
|
||||
for (i, entry) in entries.iter().enumerate() {
|
||||
res = bank.process_entry(&entry);
|
||||
let my_id = keypair.pubkey();
|
||||
let (scheduled_leader, _) = bank
|
||||
.get_current_leader()
|
||||
.expect("Scheduled leader id should never be unknown while processing entries");
|
||||
for (i, entry) in entries.iter().enumerate() {
|
||||
res = bank.process_entry(&entry);
|
||||
let my_id = keypair.pubkey();
|
||||
let (scheduled_leader, _) = bank
|
||||
.get_current_leader()
|
||||
.expect("Scheduled leader id should never be unknown while processing entries");
|
||||
|
||||
// TODO: Remove this soon once we boot the leader from ClusterInfo
|
||||
if scheduled_leader != current_leader {
|
||||
cluster_info.write().unwrap().set_leader(scheduled_leader);
|
||||
}
|
||||
if my_id == scheduled_leader {
|
||||
num_entries_to_write = i + 1;
|
||||
break;
|
||||
}
|
||||
|
||||
if res.is_err() {
|
||||
// TODO: This will return early from the first entry that has an erroneous
|
||||
// transaction, instad of processing the rest of the entries in the vector
|
||||
// of received entries. This is in line with previous behavior when
|
||||
// bank.process_entries() was used to process the entries, but doesn't solve the
|
||||
// issue that the bank state was still changed, leading to inconsistencies with the
|
||||
// leader as the leader currently should not be publishing erroneous transactions
|
||||
break;
|
||||
}
|
||||
// TODO: Remove this soon once we boot the leader from ClusterInfo
|
||||
if scheduled_leader != current_leader {
|
||||
cluster_info.write().unwrap().set_leader(scheduled_leader);
|
||||
}
|
||||
if my_id == scheduled_leader {
|
||||
num_entries_to_write = i + 1;
|
||||
break;
|
||||
}
|
||||
|
||||
// If leader rotation happened, only write the entries up to leader rotation.
|
||||
entries.truncate(num_entries_to_write);
|
||||
if res.is_err() {
|
||||
// TODO: This will return early from the first entry that has an erroneous
|
||||
// transaction, instad of processing the rest of the entries in the vector
|
||||
// of received entries. This is in line with previous behavior when
|
||||
// bank.process_entries() was used to process the entries, but doesn't solve the
|
||||
// issue that the bank state was still changed, leading to inconsistencies with the
|
||||
// leader as the leader currently should not be publishing erroneous transactions
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If leader rotation happened, only write the entries up to leader rotation.
|
||||
entries.truncate(num_entries_to_write);
|
||||
*last_entry_id = Some(
|
||||
entries
|
||||
.last()
|
||||
.expect("Entries cannot be empty at this point")
|
||||
.id
|
||||
};
|
||||
|
||||
if let Some(sender) = vote_blob_sender {
|
||||
send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?;
|
||||
}
|
||||
.id,
|
||||
);
|
||||
|
||||
inc_new_counter_info!(
|
||||
"replicate-transactions",
|
||||
|
@ -136,7 +133,11 @@ impl ReplicateStage {
|
|||
|
||||
*entry_height += entries_len;
|
||||
res?;
|
||||
Ok(last_entry_id)
|
||||
if let Some(sender) = vote_blob_sender {
|
||||
send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
|
@ -197,13 +198,12 @@ impl ReplicateStage {
|
|||
vote_sender,
|
||||
&ledger_entry_sender,
|
||||
&mut entry_height_,
|
||||
&mut last_entry_id,
|
||||
) {
|
||||
Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
|
||||
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
|
||||
Err(e) => error!("{:?}", e),
|
||||
Ok(last_entry_id_) => {
|
||||
last_entry_id = Some(last_entry_id_);
|
||||
}
|
||||
Ok(()) => (),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,16 +234,19 @@ mod test {
|
|||
use cluster_info::{ClusterInfo, Node};
|
||||
use entry::Entry;
|
||||
use fullnode::Fullnode;
|
||||
use hash::Hash;
|
||||
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
|
||||
use ledger::{create_tmp_sample_ledger, LedgerWriter};
|
||||
use ledger::{create_ticks, create_tmp_sample_ledger, LedgerWriter};
|
||||
use logger;
|
||||
use replicate_stage::{ReplicateStage, ReplicateStageReturnType};
|
||||
use result::Error;
|
||||
use service::Service;
|
||||
use signature::{Keypair, KeypairUtil};
|
||||
use std::fs::remove_dir_all;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use vote_stage::{send_validator_vote, VoteError};
|
||||
|
||||
#[test]
|
||||
pub fn test_replicate_stage_leader_rotation_exit() {
|
||||
|
@ -309,7 +312,7 @@ mod test {
|
|||
// Set up the replicate stage
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (replicate_stage, _ledger_writer_recv) = ReplicateStage::new(
|
||||
let (replicate_stage, ledger_writer_recv) = ReplicateStage::new(
|
||||
Arc::new(my_keypair),
|
||||
Arc::new(vote_account_keypair),
|
||||
Arc::new(bank),
|
||||
|
@ -335,11 +338,233 @@ mod test {
|
|||
|
||||
// Add on the only entries that weren't ticks to the bootstrap height to get the
|
||||
// total expected entry length
|
||||
let leader_rotation_index = (bootstrap_height - initial_tick_height - 1) as usize;
|
||||
let expected_entry_height =
|
||||
bootstrap_height + initial_non_tick_height + active_set_entries_len;
|
||||
let expected_last_id =
|
||||
entries_to_send[(bootstrap_height - initial_tick_height - 1) as usize].id;
|
||||
entry_sender.send(entries_to_send).unwrap();
|
||||
let expected_last_id = entries_to_send[leader_rotation_index].id;
|
||||
entry_sender.send(entries_to_send.clone()).unwrap();
|
||||
|
||||
// Wait for replicate_stage to exit and check return value is correct
|
||||
assert_eq!(
|
||||
Some(ReplicateStageReturnType::LeaderRotation(
|
||||
bootstrap_height,
|
||||
expected_entry_height,
|
||||
expected_last_id,
|
||||
)),
|
||||
replicate_stage.join().expect("replicate stage join")
|
||||
);
|
||||
|
||||
// Check that the entries on the ledger writer channel are correct
|
||||
let received_ticks = ledger_writer_recv
|
||||
.recv()
|
||||
.expect("Expected to recieve an entry on the ledger writer receiver");
|
||||
|
||||
assert_eq!(
|
||||
&received_ticks[..],
|
||||
&entries_to_send[..leader_rotation_index + 1]
|
||||
);
|
||||
|
||||
assert_eq!(exit.load(Ordering::Relaxed), true);
|
||||
|
||||
let _ignored = remove_dir_all(&my_ledger_path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vote_error_replicate_stage_correctness() {
|
||||
// Set up dummy node to host a ReplicateStage
|
||||
let my_keypair = Keypair::new();
|
||||
let my_id = my_keypair.pubkey();
|
||||
let my_node = Node::new_localhost_with_pubkey(my_id);
|
||||
|
||||
// Create keypair for the leader
|
||||
let leader_id = Keypair::new().pubkey();
|
||||
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
|
||||
|
||||
let num_ending_ticks = 0;
|
||||
let (_, my_ledger_path, genesis_entries) = create_tmp_sample_ledger(
|
||||
"test_vote_error_replicate_stage_correctness",
|
||||
10_000,
|
||||
num_ending_ticks,
|
||||
leader_id,
|
||||
500,
|
||||
);
|
||||
|
||||
let initial_entry_len = genesis_entries.len();
|
||||
|
||||
// Set up the bank
|
||||
let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
|
||||
|
||||
// Set up the cluster info
|
||||
let cluster_info_me = Arc::new(RwLock::new(
|
||||
ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new"),
|
||||
));
|
||||
|
||||
// Set up the replicate stage
|
||||
let vote_account_keypair = Arc::new(Keypair::new());
|
||||
let bank = Arc::new(bank);
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (replicate_stage, ledger_writer_recv) = ReplicateStage::new(
|
||||
Arc::new(my_keypair),
|
||||
vote_account_keypair.clone(),
|
||||
bank.clone(),
|
||||
cluster_info_me.clone(),
|
||||
entry_receiver,
|
||||
exit.clone(),
|
||||
initial_entry_len as u64,
|
||||
);
|
||||
|
||||
// Vote sender should error because no leader contact info is found in the
|
||||
// ClusterInfo
|
||||
let (mock_sender, _mock_receiver) = channel();
|
||||
let vote_err =
|
||||
send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender);
|
||||
if let Err(Error::VoteError(vote_error)) = vote_err {
|
||||
assert_eq!(vote_error, VoteError::LeaderInfoNotFound);
|
||||
} else {
|
||||
panic!("Expected validator vote to fail with LeaderInfoNotFound");
|
||||
}
|
||||
|
||||
// Send ReplicateStage an entry, should see it on the ledger writer receiver
|
||||
let next_tick = create_ticks(
|
||||
1,
|
||||
genesis_entries
|
||||
.last()
|
||||
.expect("Expected nonzero number of entries in genesis")
|
||||
.id,
|
||||
);
|
||||
entry_sender
|
||||
.send(next_tick.clone())
|
||||
.expect("Error sending entry to ReplicateStage");
|
||||
let received_tick = ledger_writer_recv
|
||||
.recv()
|
||||
.expect("Expected to recieve an entry on the ledger writer receiver");
|
||||
|
||||
assert_eq!(next_tick, received_tick);
|
||||
drop(entry_sender);
|
||||
replicate_stage
|
||||
.join()
|
||||
.expect("Expect successful ReplicateStage exit");
|
||||
let _ignored = remove_dir_all(&my_ledger_path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vote_error_replicate_stage_leader_rotation() {
|
||||
// Set up dummy node to host a ReplicateStage
|
||||
let my_keypair = Keypair::new();
|
||||
let my_id = my_keypair.pubkey();
|
||||
let my_node = Node::new_localhost_with_pubkey(my_id);
|
||||
|
||||
// Create keypair for the leader
|
||||
let leader_id = Keypair::new().pubkey();
|
||||
|
||||
// Create the ledger
|
||||
let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger(
|
||||
"test_vote_error_replicate_stage_leader_rotation",
|
||||
10_000,
|
||||
0,
|
||||
leader_id,
|
||||
500,
|
||||
);
|
||||
|
||||
let mut last_id = genesis_entries
|
||||
.last()
|
||||
.expect("expected at least one genesis entry")
|
||||
.id;
|
||||
|
||||
// Write two entries to the ledger so that the validator is in the active set:
|
||||
// 1) Give the validator a nonzero number of tokens 2) A vote from the validator.
|
||||
// This will cause leader rotation after the bootstrap height
|
||||
let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap();
|
||||
let (active_set_entries, vote_account_keypair) =
|
||||
make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0);
|
||||
last_id = active_set_entries.last().unwrap().id;
|
||||
let initial_tick_height = genesis_entries
|
||||
.iter()
|
||||
.skip(2)
|
||||
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
|
||||
let active_set_entries_len = active_set_entries.len() as u64;
|
||||
let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height;
|
||||
let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len;
|
||||
ledger_writer.write_entries(&active_set_entries).unwrap();
|
||||
|
||||
// Set up the LeaderScheduler so that this this node becomes the leader at
|
||||
// bootstrap_height = num_bootstrap_slots * leader_rotation_interval
|
||||
let leader_rotation_interval = 10;
|
||||
let num_bootstrap_slots = 2;
|
||||
let bootstrap_height = num_bootstrap_slots * leader_rotation_interval;
|
||||
let leader_scheduler_config = LeaderSchedulerConfig::new(
|
||||
Some(bootstrap_height),
|
||||
Some(leader_rotation_interval),
|
||||
Some(leader_rotation_interval * 2),
|
||||
Some(bootstrap_height),
|
||||
);
|
||||
|
||||
let leader_scheduler =
|
||||
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
|
||||
|
||||
// Set up the bank
|
||||
let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
|
||||
|
||||
// Set up the cluster info
|
||||
let cluster_info_me = Arc::new(RwLock::new(
|
||||
ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new"),
|
||||
));
|
||||
|
||||
// Set up the replicate stage
|
||||
let vote_account_keypair = Arc::new(vote_account_keypair);
|
||||
let bank = Arc::new(bank);
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (replicate_stage, ledger_writer_recv) = ReplicateStage::new(
|
||||
Arc::new(my_keypair),
|
||||
vote_account_keypair.clone(),
|
||||
bank.clone(),
|
||||
cluster_info_me.clone(),
|
||||
entry_receiver,
|
||||
exit.clone(),
|
||||
initial_entry_len as u64,
|
||||
);
|
||||
|
||||
// Vote sender should error because no leader contact info is found in the
|
||||
// ClusterInfo
|
||||
let (mock_sender, _mock_receiver) = channel();
|
||||
let vote_err =
|
||||
send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender);
|
||||
if let Err(Error::VoteError(vote_error)) = vote_err {
|
||||
assert_eq!(vote_error, VoteError::LeaderInfoNotFound);
|
||||
} else {
|
||||
panic!("Expected validator vote to fail with LeaderInfoNotFound");
|
||||
}
|
||||
|
||||
// Send enough ticks to trigger leader rotation
|
||||
let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize;
|
||||
let num_hashes = 1;
|
||||
|
||||
// Add on the only entries that weren't ticks to the bootstrap height to get the
|
||||
// total expected entry length
|
||||
let expected_entry_height =
|
||||
bootstrap_height + initial_non_tick_height + active_set_entries_len;
|
||||
let leader_rotation_index = (bootstrap_height - initial_tick_height - 1) as usize;
|
||||
let mut expected_last_id = Hash::default();
|
||||
for i in 0..total_entries_to_send {
|
||||
let entry = Entry::new(&mut last_id, num_hashes, vec![]);
|
||||
last_id = entry.id;
|
||||
entry_sender
|
||||
.send(vec![entry.clone()])
|
||||
.expect("Expected to be able to send entry to ReplicateStage");
|
||||
// Check that the entries on the ledger writer channel are correct
|
||||
let received_entry = ledger_writer_recv
|
||||
.recv()
|
||||
.expect("Expected to recieve an entry on the ledger writer receiver");
|
||||
assert_eq!(received_entry[0], entry);
|
||||
|
||||
if i == leader_rotation_index {
|
||||
expected_last_id = entry.id;
|
||||
}
|
||||
}
|
||||
|
||||
assert_ne!(expected_last_id, Hash::default());
|
||||
|
||||
// Wait for replicate_stage to exit and check return value is correct
|
||||
assert_eq!(
|
||||
|
@ -352,7 +577,6 @@ mod test {
|
|||
);
|
||||
|
||||
assert_eq!(exit.load(Ordering::Relaxed), true);
|
||||
|
||||
let _ignored = remove_dir_all(&my_ledger_path);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue