diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 54da567737..c824377892 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -61,7 +61,8 @@ impl ReplicateStage { vote_blob_sender: Option<&BlobSender>, ledger_entry_sender: &EntrySender, entry_height: &mut u64, - ) -> Result { + last_entry_id: &mut Option, + ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote let mut entries = window_receiver.recv_timeout(timer)?; @@ -76,49 +77,45 @@ impl ReplicateStage { ); let mut res = Ok(()); - let last_entry_id = { - let mut num_entries_to_write = entries.len(); - let (current_leader, _) = bank + let mut num_entries_to_write = entries.len(); + let (current_leader, _) = bank + .get_current_leader() + .expect("Scheduled leader id should never be unknown while processing entries"); + for (i, entry) in entries.iter().enumerate() { + res = bank.process_entry(&entry); + let my_id = keypair.pubkey(); + let (scheduled_leader, _) = bank .get_current_leader() .expect("Scheduled leader id should never be unknown while processing entries"); - for (i, entry) in entries.iter().enumerate() { - res = bank.process_entry(&entry); - let my_id = keypair.pubkey(); - let (scheduled_leader, _) = bank - .get_current_leader() - .expect("Scheduled leader id should never be unknown while processing entries"); - // TODO: Remove this soon once we boot the leader from ClusterInfo - if scheduled_leader != current_leader { - cluster_info.write().unwrap().set_leader(scheduled_leader); - } - if my_id == scheduled_leader { - num_entries_to_write = i + 1; - break; - } - - if res.is_err() { - // TODO: This will return early from the first entry that has an erroneous - // transaction, instad of processing the rest of the entries in the vector - // of received entries. This is in line with previous behavior when - // bank.process_entries() was used to process the entries, but doesn't solve the - // issue that the bank state was still changed, leading to inconsistencies with the - // leader as the leader currently should not be publishing erroneous transactions - break; - } + // TODO: Remove this soon once we boot the leader from ClusterInfo + if scheduled_leader != current_leader { + cluster_info.write().unwrap().set_leader(scheduled_leader); + } + if my_id == scheduled_leader { + num_entries_to_write = i + 1; + break; } - // If leader rotation happened, only write the entries up to leader rotation. - entries.truncate(num_entries_to_write); + if res.is_err() { + // TODO: This will return early from the first entry that has an erroneous + // transaction, instad of processing the rest of the entries in the vector + // of received entries. This is in line with previous behavior when + // bank.process_entries() was used to process the entries, but doesn't solve the + // issue that the bank state was still changed, leading to inconsistencies with the + // leader as the leader currently should not be publishing erroneous transactions + break; + } + } + + // If leader rotation happened, only write the entries up to leader rotation. + entries.truncate(num_entries_to_write); + *last_entry_id = Some( entries .last() .expect("Entries cannot be empty at this point") - .id - }; - - if let Some(sender) = vote_blob_sender { - send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?; - } + .id, + ); inc_new_counter_info!( "replicate-transactions", @@ -136,7 +133,11 @@ impl ReplicateStage { *entry_height += entries_len; res?; - Ok(last_entry_id) + if let Some(sender) = vote_blob_sender { + send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?; + } + + Ok(()) } pub fn new( @@ -197,13 +198,12 @@ impl ReplicateStage { vote_sender, &ledger_entry_sender, &mut entry_height_, + &mut last_entry_id, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(e) => error!("{:?}", e), - Ok(last_entry_id_) => { - last_entry_id = Some(last_entry_id_); - } + Ok(()) => (), } } @@ -234,16 +234,19 @@ mod test { use cluster_info::{ClusterInfo, Node}; use entry::Entry; use fullnode::Fullnode; + use hash::Hash; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; - use ledger::{create_tmp_sample_ledger, LedgerWriter}; + use ledger::{create_ticks, create_tmp_sample_ledger, LedgerWriter}; use logger; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; + use result::Error; use service::Service; use signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; + use vote_stage::{send_validator_vote, VoteError}; #[test] pub fn test_replicate_stage_leader_rotation_exit() { @@ -309,7 +312,7 @@ mod test { // Set up the replicate stage let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let (replicate_stage, _ledger_writer_recv) = ReplicateStage::new( + let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( Arc::new(my_keypair), Arc::new(vote_account_keypair), Arc::new(bank), @@ -335,11 +338,233 @@ mod test { // Add on the only entries that weren't ticks to the bootstrap height to get the // total expected entry length + let leader_rotation_index = (bootstrap_height - initial_tick_height - 1) as usize; let expected_entry_height = bootstrap_height + initial_non_tick_height + active_set_entries_len; - let expected_last_id = - entries_to_send[(bootstrap_height - initial_tick_height - 1) as usize].id; - entry_sender.send(entries_to_send).unwrap(); + let expected_last_id = entries_to_send[leader_rotation_index].id; + entry_sender.send(entries_to_send.clone()).unwrap(); + + // Wait for replicate_stage to exit and check return value is correct + assert_eq!( + Some(ReplicateStageReturnType::LeaderRotation( + bootstrap_height, + expected_entry_height, + expected_last_id, + )), + replicate_stage.join().expect("replicate stage join") + ); + + // Check that the entries on the ledger writer channel are correct + let received_ticks = ledger_writer_recv + .recv() + .expect("Expected to recieve an entry on the ledger writer receiver"); + + assert_eq!( + &received_ticks[..], + &entries_to_send[..leader_rotation_index + 1] + ); + + assert_eq!(exit.load(Ordering::Relaxed), true); + + let _ignored = remove_dir_all(&my_ledger_path); + } + + #[test] + fn test_vote_error_replicate_stage_correctness() { + // Set up dummy node to host a ReplicateStage + let my_keypair = Keypair::new(); + let my_id = my_keypair.pubkey(); + let my_node = Node::new_localhost_with_pubkey(my_id); + + // Create keypair for the leader + let leader_id = Keypair::new().pubkey(); + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); + + let num_ending_ticks = 0; + let (_, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( + "test_vote_error_replicate_stage_correctness", + 10_000, + num_ending_ticks, + leader_id, + 500, + ); + + let initial_entry_len = genesis_entries.len(); + + // Set up the bank + let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); + + // Set up the cluster info + let cluster_info_me = Arc::new(RwLock::new( + ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new"), + )); + + // Set up the replicate stage + let vote_account_keypair = Arc::new(Keypair::new()); + let bank = Arc::new(bank); + let (entry_sender, entry_receiver) = channel(); + let exit = Arc::new(AtomicBool::new(false)); + let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( + Arc::new(my_keypair), + vote_account_keypair.clone(), + bank.clone(), + cluster_info_me.clone(), + entry_receiver, + exit.clone(), + initial_entry_len as u64, + ); + + // Vote sender should error because no leader contact info is found in the + // ClusterInfo + let (mock_sender, _mock_receiver) = channel(); + let vote_err = + send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender); + if let Err(Error::VoteError(vote_error)) = vote_err { + assert_eq!(vote_error, VoteError::LeaderInfoNotFound); + } else { + panic!("Expected validator vote to fail with LeaderInfoNotFound"); + } + + // Send ReplicateStage an entry, should see it on the ledger writer receiver + let next_tick = create_ticks( + 1, + genesis_entries + .last() + .expect("Expected nonzero number of entries in genesis") + .id, + ); + entry_sender + .send(next_tick.clone()) + .expect("Error sending entry to ReplicateStage"); + let received_tick = ledger_writer_recv + .recv() + .expect("Expected to recieve an entry on the ledger writer receiver"); + + assert_eq!(next_tick, received_tick); + drop(entry_sender); + replicate_stage + .join() + .expect("Expect successful ReplicateStage exit"); + let _ignored = remove_dir_all(&my_ledger_path); + } + + #[test] + fn test_vote_error_replicate_stage_leader_rotation() { + // Set up dummy node to host a ReplicateStage + let my_keypair = Keypair::new(); + let my_id = my_keypair.pubkey(); + let my_node = Node::new_localhost_with_pubkey(my_id); + + // Create keypair for the leader + let leader_id = Keypair::new().pubkey(); + + // Create the ledger + let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( + "test_vote_error_replicate_stage_leader_rotation", + 10_000, + 0, + leader_id, + 500, + ); + + let mut last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + + // Write two entries to the ledger so that the validator is in the active set: + // 1) Give the validator a nonzero number of tokens 2) A vote from the validator. + // This will cause leader rotation after the bootstrap height + let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); + let (active_set_entries, vote_account_keypair) = + make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0); + last_id = active_set_entries.last().unwrap().id; + let initial_tick_height = genesis_entries + .iter() + .skip(2) + .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); + let active_set_entries_len = active_set_entries.len() as u64; + let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; + let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len; + ledger_writer.write_entries(&active_set_entries).unwrap(); + + // Set up the LeaderScheduler so that this this node becomes the leader at + // bootstrap_height = num_bootstrap_slots * leader_rotation_interval + let leader_rotation_interval = 10; + let num_bootstrap_slots = 2; + let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; + let leader_scheduler_config = LeaderSchedulerConfig::new( + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(leader_rotation_interval * 2), + Some(bootstrap_height), + ); + + let leader_scheduler = + Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))); + + // Set up the bank + let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); + + // Set up the cluster info + let cluster_info_me = Arc::new(RwLock::new( + ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new"), + )); + + // Set up the replicate stage + let vote_account_keypair = Arc::new(vote_account_keypair); + let bank = Arc::new(bank); + let (entry_sender, entry_receiver) = channel(); + let exit = Arc::new(AtomicBool::new(false)); + let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( + Arc::new(my_keypair), + vote_account_keypair.clone(), + bank.clone(), + cluster_info_me.clone(), + entry_receiver, + exit.clone(), + initial_entry_len as u64, + ); + + // Vote sender should error because no leader contact info is found in the + // ClusterInfo + let (mock_sender, _mock_receiver) = channel(); + let vote_err = + send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender); + if let Err(Error::VoteError(vote_error)) = vote_err { + assert_eq!(vote_error, VoteError::LeaderInfoNotFound); + } else { + panic!("Expected validator vote to fail with LeaderInfoNotFound"); + } + + // Send enough ticks to trigger leader rotation + let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize; + let num_hashes = 1; + + // Add on the only entries that weren't ticks to the bootstrap height to get the + // total expected entry length + let expected_entry_height = + bootstrap_height + initial_non_tick_height + active_set_entries_len; + let leader_rotation_index = (bootstrap_height - initial_tick_height - 1) as usize; + let mut expected_last_id = Hash::default(); + for i in 0..total_entries_to_send { + let entry = Entry::new(&mut last_id, num_hashes, vec![]); + last_id = entry.id; + entry_sender + .send(vec![entry.clone()]) + .expect("Expected to be able to send entry to ReplicateStage"); + // Check that the entries on the ledger writer channel are correct + let received_entry = ledger_writer_recv + .recv() + .expect("Expected to recieve an entry on the ledger writer receiver"); + assert_eq!(received_entry[0], entry); + + if i == leader_rotation_index { + expected_last_id = entry.id; + } + } + + assert_ne!(expected_last_id, Hash::default()); // Wait for replicate_stage to exit and check return value is correct assert_eq!( @@ -352,7 +577,6 @@ mod test { ); assert_eq!(exit.load(Ordering::Relaxed), true); - let _ignored = remove_dir_all(&my_ledger_path); } }