//! The `replicate_stage` replicates transactions broadcast by the leader. use bank::Bank; use cluster_info::ClusterInfo; use counter::Counter; use entry::{EntryReceiver, EntrySender}; use solana_sdk::hash::Hash; use ledger::Block; use log::Level; use packet::BlobError; use result::{Error, Result}; use service::Service; use solana_metrics::{influxdb, submit}; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; use streamer::{responder, BlobSender}; use vote_stage::send_validator_vote; #[derive(Debug, PartialEq, Eq, Clone)] pub enum ReplicateStageReturnType { LeaderRotation(u64, u64, Hash), } // Implement a destructor for the ReplicateStage thread to signal it exited // even on panics struct Finalizer { exit_sender: Arc, } impl Finalizer { fn new(exit_sender: Arc) -> Self { Finalizer { exit_sender } } } // Implement a destructor for Finalizer. impl Drop for Finalizer { fn drop(&mut self) { self.exit_sender.clone().store(true, Ordering::Relaxed); } } pub struct ReplicateStage { t_responder: JoinHandle<()>, t_replicate: JoinHandle>, } impl ReplicateStage { /// Process entry blobs, already in order fn replicate_requests( bank: &Arc, cluster_info: &Arc>, window_receiver: &EntryReceiver, keypair: &Arc, vote_account_keypair: &Arc, vote_blob_sender: Option<&BlobSender>, ledger_entry_sender: &EntrySender, entry_height: &mut u64, last_entry_id: &mut Hash, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote let mut entries = window_receiver.recv_timeout(timer)?; while let Ok(mut more) = window_receiver.try_recv() { entries.append(&mut more); } submit( influxdb::Point::new("replicate-stage") .add_field("count", influxdb::Value::Integer(entries.len() as i64)) .to_owned(), ); let mut res = Ok(()); let mut num_entries_to_write = entries.len(); let now = Instant::now(); if !entries.as_slice().verify(last_entry_id) { inc_new_counter_info!("replicate_stage-verify-fail", entries.len()); return Err(Error::BlobError(BlobError::VerificationFailed)); } inc_new_counter_info!( "replicate_stage-verify-duration", duration_as_ms(&now.elapsed()) as usize ); let (current_leader, _) = bank .get_current_leader() .expect("Scheduled leader id should never be unknown while processing entries"); for (i, entry) in entries.iter().enumerate() { res = bank.process_entry(&entry); let my_id = keypair.pubkey(); let (scheduled_leader, _) = bank .get_current_leader() .expect("Scheduled leader id should never be unknown while processing entries"); // TODO: Remove this soon once we boot the leader from ClusterInfo if scheduled_leader != current_leader { cluster_info.write().unwrap().set_leader(scheduled_leader); } if my_id == scheduled_leader { num_entries_to_write = i + 1; break; } if res.is_err() { // TODO: This will return early from the first entry that has an erroneous // transaction, instead of processing the rest of the entries in the vector // of received entries. This is in line with previous behavior when // bank.process_entries() was used to process the entries, but doesn't solve the // issue that the bank state was still changed, leading to inconsistencies with the // leader as the leader currently should not be publishing erroneous transactions break; } } // If leader rotation happened, only write the entries up to leader rotation. entries.truncate(num_entries_to_write); *last_entry_id = entries .last() .expect("Entries cannot be empty at this point") .id; inc_new_counter_info!( "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() ); let entries_len = entries.len() as u64; // TODO: move this to another stage? // TODO: In line with previous behavior, this will write all the entries even if // an error occurred processing one of the entries (causing the rest of the entries to // not be processed). if entries_len != 0 { ledger_entry_sender.send(entries)?; } *entry_height += entries_len; res?; if let Some(sender) = vote_blob_sender { send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?; } Ok(()) } pub fn new( keypair: Arc, vote_account_keypair: Arc, bank: Arc, cluster_info: Arc>, window_receiver: EntryReceiver, exit: Arc, entry_height: u64, last_entry_id: Hash, ) -> (Self, EntryReceiver) { let (vote_blob_sender, vote_blob_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver); let keypair = Arc::new(keypair); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit); let now = Instant::now(); let mut next_vote_secs = 1; let mut entry_height_ = entry_height; let mut last_entry_id = last_entry_id; loop { let (leader_id, _) = bank .get_current_leader() .expect("Scheduled leader id should never be unknown at this point"); if leader_id == keypair.pubkey() { return Some(ReplicateStageReturnType::LeaderRotation( bank.tick_height(), entry_height_, // We should never start the TPU / this stage on an exact entry that causes leader // rotation (Fullnode should automatically transition on startup if it detects // are no longer a validator. Hence we can assume that some entry must have // triggered leader rotation last_entry_id, )); } // Only vote once a second. let vote_sender = if now.elapsed().as_secs() > next_vote_secs { next_vote_secs += 1; Some(&vote_blob_sender) } else { None }; match Self::replicate_requests( &bank, &cluster_info, &window_receiver, &keypair, &vote_account_keypair, vote_sender, &ledger_entry_sender, &mut entry_height_, &mut last_entry_id, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(e) => error!("{:?}", e), Ok(()) => (), } } None }).unwrap(); ( ReplicateStage { t_responder, t_replicate, }, ledger_entry_receiver, ) } } impl Service for ReplicateStage { type JoinReturnType = Option; fn join(self) -> thread::Result> { self.t_responder.join()?; self.t_replicate.join() } } #[cfg(test)] mod test { use bank::Bank; use cluster_info::{ClusterInfo, Node}; use entry::Entry; use fullnode::Fullnode; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use ledger::{create_ticks, create_tmp_sample_ledger, LedgerWriter}; use logger; use packet::BlobError; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use result::Error; use service::Service; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use vote_stage::{send_validator_vote, VoteError}; #[test] pub fn test_replicate_stage_leader_rotation_exit() { logger::setup(); // Set up dummy node to host a ReplicateStage let my_keypair = Keypair::new(); let my_id = my_keypair.pubkey(); let my_node = Node::new_localhost_with_pubkey(my_id); let cluster_info_me = ClusterInfo::new(my_node.info.clone()); // Create keypair for the old leader let old_leader_id = Keypair::new().pubkey(); // Create a ledger let num_ending_ticks = 1; let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( "test_replicate_stage_leader_rotation_exit", 10_000, num_ending_ticks, old_leader_id, 500, ); let mut last_id = genesis_entries .last() .expect("expected at least one genesis entry") .id; // Write two entries to the ledger so that the validator is in the active set: // 1) Give the validator a nonzero number of tokens 2) A vote from the validator . // This will cause leader rotation after the bootstrap height let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); let (active_set_entries, vote_account_keypair) = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0); last_id = active_set_entries.last().unwrap().id; let initial_tick_height = genesis_entries .iter() .skip(2) .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); let active_set_entries_len = active_set_entries.len() as u64; let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len; ledger_writer.write_entries(&active_set_entries).unwrap(); // Set up the LeaderScheduler so that this this node becomes the leader at // bootstrap_height = num_bootstrap_slots * leader_rotation_interval let leader_rotation_interval = 10; let num_bootstrap_slots = 2; let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; let leader_scheduler_config = LeaderSchedulerConfig::new( Some(bootstrap_height), Some(leader_rotation_interval), Some(leader_rotation_interval * 2), Some(bootstrap_height), ); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))); // Set up the bank let (bank, _, last_entry_id) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); // Set up the replicate stage let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( Arc::new(my_keypair), Arc::new(vote_account_keypair), Arc::new(bank), Arc::new(RwLock::new(cluster_info_me)), entry_receiver, exit.clone(), initial_entry_len, last_entry_id, ); // Send enough ticks to trigger leader rotation let extra_entries = leader_rotation_interval; let total_entries_to_send = (bootstrap_height + extra_entries) as usize; let num_hashes = 1; let mut entries_to_send = vec![]; while entries_to_send.len() < total_entries_to_send { let entry = Entry::new(&mut last_id, num_hashes, vec![]); last_id = entry.id; entries_to_send.push(entry); } assert!((num_ending_ticks as u64) < bootstrap_height); // Add on the only entries that weren't ticks to the bootstrap height to get the // total expected entry length let leader_rotation_index = (bootstrap_height - initial_tick_height - 1) as usize; let expected_entry_height = bootstrap_height + initial_non_tick_height + active_set_entries_len; let expected_last_id = entries_to_send[leader_rotation_index].id; entry_sender.send(entries_to_send.clone()).unwrap(); // Wait for replicate_stage to exit and check return value is correct assert_eq!( Some(ReplicateStageReturnType::LeaderRotation( bootstrap_height, expected_entry_height, expected_last_id, )), replicate_stage.join().expect("replicate stage join") ); // Check that the entries on the ledger writer channel are correct let received_ticks = ledger_writer_recv .recv() .expect("Expected to recieve an entry on the ledger writer receiver"); assert_eq!( &received_ticks[..], &entries_to_send[..leader_rotation_index + 1] ); assert_eq!(exit.load(Ordering::Relaxed), true); let _ignored = remove_dir_all(&my_ledger_path); } #[test] fn test_vote_error_replicate_stage_correctness() { // Set up dummy node to host a ReplicateStage let my_keypair = Keypair::new(); let my_id = my_keypair.pubkey(); let my_node = Node::new_localhost_with_pubkey(my_id); // Create keypair for the leader let leader_id = Keypair::new().pubkey(); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); let num_ending_ticks = 0; let (_, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( "test_vote_error_replicate_stage_correctness", 10_000, num_ending_ticks, leader_id, 500, ); let initial_entry_len = genesis_entries.len(); // Set up the bank let (bank, _, last_entry_id) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); // Set up the cluster info let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); // Set up the replicate stage let vote_account_keypair = Arc::new(Keypair::new()); let bank = Arc::new(bank); let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( Arc::new(my_keypair), vote_account_keypair.clone(), bank.clone(), cluster_info_me.clone(), entry_receiver, exit.clone(), initial_entry_len as u64, last_entry_id, ); // Vote sender should error because no leader contact info is found in the // ClusterInfo let (mock_sender, _mock_receiver) = channel(); let vote_err = send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender); if let Err(Error::VoteError(vote_error)) = vote_err { assert_eq!(vote_error, VoteError::LeaderInfoNotFound); } else { panic!("Expected validator vote to fail with LeaderInfoNotFound"); } // Send ReplicateStage an entry, should see it on the ledger writer receiver let next_tick = create_ticks( 1, genesis_entries .last() .expect("Expected nonzero number of entries in genesis") .id, ); entry_sender .send(next_tick.clone()) .expect("Error sending entry to ReplicateStage"); let received_tick = ledger_writer_recv .recv() .expect("Expected to recieve an entry on the ledger writer receiver"); assert_eq!(next_tick, received_tick); drop(entry_sender); replicate_stage .join() .expect("Expect successful ReplicateStage exit"); let _ignored = remove_dir_all(&my_ledger_path); } #[test] fn test_vote_error_replicate_stage_leader_rotation() { // Set up dummy node to host a ReplicateStage let my_keypair = Keypair::new(); let my_id = my_keypair.pubkey(); let my_node = Node::new_localhost_with_pubkey(my_id); // Create keypair for the leader let leader_id = Keypair::new().pubkey(); // Create the ledger let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( "test_vote_error_replicate_stage_leader_rotation", 10_000, 0, leader_id, 500, ); let mut last_id = genesis_entries .last() .expect("expected at least one genesis entry") .id; // Write two entries to the ledger so that the validator is in the active set: // 1) Give the validator a nonzero number of tokens 2) A vote from the validator. // This will cause leader rotation after the bootstrap height let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); let (active_set_entries, vote_account_keypair) = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0); last_id = active_set_entries.last().unwrap().id; let initial_tick_height = genesis_entries .iter() .skip(2) .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); let active_set_entries_len = active_set_entries.len() as u64; let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len; ledger_writer.write_entries(&active_set_entries).unwrap(); // Set up the LeaderScheduler so that this this node becomes the leader at // bootstrap_height = num_bootstrap_slots * leader_rotation_interval let leader_rotation_interval = 10; let num_bootstrap_slots = 2; let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; let leader_scheduler_config = LeaderSchedulerConfig::new( Some(bootstrap_height), Some(leader_rotation_interval), Some(leader_rotation_interval * 2), Some(bootstrap_height), ); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))); // Set up the bank let (bank, _, last_entry_id) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); // Set up the cluster info let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); // Set up the replicate stage let vote_account_keypair = Arc::new(vote_account_keypair); let bank = Arc::new(bank); let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( Arc::new(my_keypair), vote_account_keypair.clone(), bank.clone(), cluster_info_me.clone(), entry_receiver, exit.clone(), initial_entry_len as u64, last_entry_id, ); // Vote sender should error because no leader contact info is found in the // ClusterInfo let (mock_sender, _mock_receiver) = channel(); let vote_err = send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender); if let Err(Error::VoteError(vote_error)) = vote_err { assert_eq!(vote_error, VoteError::LeaderInfoNotFound); } else { panic!("Expected validator vote to fail with LeaderInfoNotFound"); } // Send enough ticks to trigger leader rotation let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize; let num_hashes = 1; // Add on the only entries that weren't ticks to the bootstrap height to get the // total expected entry length let expected_entry_height = bootstrap_height + initial_non_tick_height + active_set_entries_len; let leader_rotation_index = (bootstrap_height - initial_tick_height - 1) as usize; let mut expected_last_id = Hash::default(); for i in 0..total_entries_to_send { let entry = Entry::new(&mut last_id, num_hashes, vec![]); last_id = entry.id; entry_sender .send(vec![entry.clone()]) .expect("Expected to be able to send entry to ReplicateStage"); // Check that the entries on the ledger writer channel are correct let received_entry = ledger_writer_recv .recv() .expect("Expected to recieve an entry on the ledger writer receiver"); assert_eq!(received_entry[0], entry); if i == leader_rotation_index { expected_last_id = entry.id; } } assert_ne!(expected_last_id, Hash::default()); // Wait for replicate_stage to exit and check return value is correct assert_eq!( Some(ReplicateStageReturnType::LeaderRotation( bootstrap_height, expected_entry_height, expected_last_id, )), replicate_stage.join().expect("replicate stage join") ); assert_eq!(exit.load(Ordering::Relaxed), true); let _ignored = remove_dir_all(&my_ledger_path); } #[test] fn test_replicate_stage_poh_error_entry_receiver() { // Set up dummy node to host a ReplicateStage let my_keypair = Keypair::new(); let my_id = my_keypair.pubkey(); let vote_keypair = Keypair::new(); let my_node = Node::new_localhost_with_pubkey(my_id); // Set up the cluster info let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); let (entry_sender, entry_receiver) = channel(); let (ledger_entry_sender, _ledger_entry_receiver) = channel(); let mut last_entry_id = Hash::default(); // Create keypair for the old leader let old_leader_id = Keypair::new().pubkey(); let (_, my_ledger_path, _) = create_tmp_sample_ledger( "test_replicate_stage_leader_rotation_exit", 10_000, 0, old_leader_id, 500, ); let mut entry_height = 0; let mut last_id = Hash::default(); let mut entries = Vec::new(); for _ in 0..5 { let entry = Entry::new(&mut last_id, 1, vec![]); //just ticks last_id = entry.id; entries.push(entry); } entry_sender .send(entries.clone()) .expect("Expected to err out"); let res = ReplicateStage::replicate_requests( &Arc::new(Bank::default()), &cluster_info_me, &entry_receiver, &Arc::new(my_keypair), &Arc::new(vote_keypair), None, &ledger_entry_sender, &mut entry_height, &mut last_entry_id, ); match res { Ok(_) => (), Err(e) => assert!(false, "Entries were not sent correctly {:?}", e), } entries.clear(); for _ in 0..5 { let entry = Entry::new(&mut Hash::default(), 0, vec![]); //just broken entries entries.push(entry); } entry_sender .send(entries.clone()) .expect("Expected to err out"); let res = ReplicateStage::replicate_requests( &Arc::new(Bank::default()), &cluster_info_me, &entry_receiver, &Arc::new(Keypair::new()), &Arc::new(Keypair::new()), None, &ledger_entry_sender, &mut entry_height, &mut last_entry_id, ); match res { Ok(_) => assert!(false, "Should have failed because entries are broken"), Err(Error::BlobError(BlobError::VerificationFailed)) => (), Err(e) => assert!( false, "Should have failed because with blob error, instead, got {:?}", e ), } let _ignored = remove_dir_all(&my_ledger_path); } }