From bfe64f5f6e35772eaad66d279f04082e667e2cd7 Mon Sep 17 00:00:00 2001 From: Carl Date: Fri, 14 Sep 2018 14:34:32 -0700 Subject: [PATCH] Added integration test for transitioning leader to validator to see that tpu pipeline can exit and restart a tvu. Fixed Tpu and broadcast stage so that exiting later stages in the pipeline also causes earlier stages to exit. --- src/banking_stage.rs | 10 ++- src/broadcast_stage.rs | 47 ++++++++--- src/fullnode.rs | 29 ++++++- src/request_stage.rs | 9 +-- src/rpu.rs | 15 +++- src/sigverify_stage.rs | 12 ++- src/tpu.rs | 16 ++-- tests/multinode.rs | 176 +++++++++++++++++++++++++++++++++-------- 8 files changed, 253 insertions(+), 61 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index f15f8caf7c..dd569a017c 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -49,7 +49,10 @@ impl BankingStage { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), + Error::SendError => { + break; + } + _ => println!("BANKING ERROR {:?}", e), } } }).unwrap(); @@ -108,7 +111,10 @@ impl BankingStage { debug!("process_transactions"); let results = bank.process_transactions(transactions); let transactions = results.into_iter().filter_map(|x| x.ok()).collect(); - signal_sender.send(Signal::Transactions(transactions))?; + match signal_sender.send(Signal::Transactions(transactions)) { + Err(_) => return Err(Error::SendError), + _ => (), + } debug!("done process_transactions"); packet_recycler.recycle(msgs, "process_transactions"); diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index bafad3486c..ecfb5b565a 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -12,7 +12,7 @@ use rayon::prelude::*; use result::{Error, Result}; use service::Service; use std::net::UdpSocket; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; @@ -148,6 +148,24 @@ fn broadcast( Ok(()) } +// Implement a destructor for the BroadcastStage thread to signal it exited +// even on panics +struct Finalizer { + exit_sender: Arc, +} + +impl Finalizer { + fn new(exit_sender: Arc) -> Self { + Finalizer { exit_sender } + } +} +// Implement a destructor for Finalizer. +impl Drop for Finalizer { + fn drop(&mut self) { + self.exit_sender.clone().store(true, Ordering::Relaxed); + } +} + pub struct BroadcastStage { thread_hdl: JoinHandle, } @@ -216,6 +234,13 @@ impl BroadcastStage { /// * `window` - Cache of blobs that we have broadcast /// * `recycler` - Blob recycler. /// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. + /// * `exit_sender` - Set to true when this stage exits, allows rest of Tpu to exit cleanly. Otherwise, + /// when a Tpu stage closes, it only closes the stages that come after it. The stages + /// that come before could be blocked on a receive, and never notice that they need to + /// exit. Now, if any stage of the Tpu closes, it will lead to closing the WriteStage (b/c + /// WriteStage is the last stage in the pipeline), which will then close Broadcast stage, + /// which will then close FetchStage in the Tpu, and then the rest of the Tpu, + /// completing the cycle. pub fn new( sock: UdpSocket, crdt: Arc>, @@ -223,13 +248,17 @@ impl BroadcastStage { entry_height: u64, recycler: BlobRecycler, receiver: Receiver>, + exit_sender: Arc, ) -> Self { let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) - .spawn(move || Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver)) + .spawn(move || { + let _exit = Finalizer::new(exit_sender); + Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver) + }) .unwrap(); - BroadcastStage { thread_hdl } + (BroadcastStage { thread_hdl }) } } @@ -252,6 +281,7 @@ mod tests { use service::Service; use signature::{Keypair, KeypairUtil, Pubkey}; use std::cmp; + use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, RwLock}; use window::{new_window_from_entries, SharedWindow}; @@ -293,7 +323,7 @@ mod tests { let shared_window = Arc::new(RwLock::new(window)); let (entry_sender, entry_receiver) = channel(); - + let exit_sender = Arc::new(AtomicBool::new(false)); // Start up the broadcast stage let broadcast_stage = BroadcastStage::new( leader_info.sockets.broadcast, @@ -302,6 +332,7 @@ mod tests { entry_height, blob_recycler.clone(), entry_receiver, + exit_sender, ); ( @@ -375,15 +406,13 @@ mod tests { }; } - let highest_index = find_highest_window_index(&shared_window); - - // TODO: 2 * LEADER_ROTATION_INTERVAL - 1 due to the same bug in - // index_blobs() as mentioned above - assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1); // Make sure the threads closed cleanly assert_eq!( broadcast_stage.join().unwrap(), BroadcastStageReturnType::LeaderRotation ); + + let highest_index = find_highest_window_index(&shared_window); + assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 08b17e929a..c915ccdf6d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -11,7 +11,7 @@ use packet::BlobRecycler; use rpc::{JsonRpcService, RPC_PORT}; use rpu::Rpu; use service::Service; -use signature::{Keypair, KeypairUtil}; +use signature::{Keypair, KeypairUtil, Pubkey}; use std::net::UdpSocket; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -44,6 +44,10 @@ impl LeaderServices { self.broadcast_stage.join()?; self.tpu.join() } + + pub fn exit(&self) -> () { + self.tpu.exit(); + } } pub struct ValidatorServices { @@ -58,6 +62,10 @@ impl ValidatorServices { pub fn join(self) -> Result<()> { self.tvu.join() } + + pub fn exit(&self) -> () { + //TODO: implement exit for Tvu + } } pub enum FullnodeReturnType { @@ -298,8 +306,7 @@ impl Fullnode { let tick_duration = None; // TODO: To light up PoH, uncomment the following line: //let tick_duration = Some(Duration::from_millis(1000)); - - let (tpu, entry_receiver) = Tpu::new( + let (tpu, entry_receiver, tpu_exit) = Tpu::new( keypair.clone(), &bank, &crdt, @@ -310,7 +317,6 @@ impl Fullnode { .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) .collect(), &blob_recycler, - exit.clone(), ledger_path, sigverify_disabled, entry_height, @@ -326,6 +332,7 @@ impl Fullnode { entry_height, blob_recycler.clone(), entry_receiver, + tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); node_role = Some(NodeRole::Leader(leader_state)); @@ -370,6 +377,7 @@ impl Fullnode { } } + self.rpu.set_new_bank(self.bank.clone()); let tvu = Tvu::new( self.keypair.clone(), &self.bank, @@ -414,6 +422,12 @@ impl Fullnode { //used for notifying many nodes in parallel to exit pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); + + match self.node_role { + Some(NodeRole::Leader(ref leader_services)) => leader_services.exit(), + Some(NodeRole::Validator(ref validator_services)) => validator_services.exit(), + _ => (), + } } pub fn close(self) -> Result<(Option)> { @@ -421,6 +435,13 @@ impl Fullnode { self.join() } + // TODO: only used for testing, get rid of this once we have actual + // leader scheduling + pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) { + let mut wcrdt = self.crdt.write().unwrap(); + wcrdt.set_scheduled_leader(entry_height, leader_id); + } + fn new_bank_from_ledger(ledger_path: &str) -> (Bank, u64, Vec) { let bank = Bank::new_default(false); let entries = read_ledger(ledger_path, true).expect("opening ledger"); diff --git a/src/request_stage.rs b/src/request_stage.rs index b40c3ebfd1..720a4ba07b 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -9,7 +9,7 @@ use result::{Error, Result}; use service::Service; use std::net::SocketAddr; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; use streamer::{self, BlobReceiver, BlobSender}; @@ -17,7 +17,7 @@ use timing; pub struct RequestStage { thread_hdl: JoinHandle<()>, - pub request_processor: Arc, + pub request_processor: Arc>, } impl RequestStage { @@ -78,19 +78,18 @@ impl RequestStage { Ok(()) } pub fn new( - request_processor: RequestProcessor, + request_processor: Arc>, packet_receiver: Receiver, packet_recycler: PacketRecycler, blob_recycler: BlobRecycler, ) -> (Self, BlobReceiver) { - let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-request-stage".to_string()) .spawn(move || loop { if let Err(e) = Self::process_request_packets( - &request_processor_, + &request_processor_.read().unwrap(), &packet_receiver, &blob_sender, &packet_recycler, diff --git a/src/rpu.rs b/src/rpu.rs index 7a634fb619..62cc5cd011 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -28,16 +28,18 @@ use packet::{BlobRecycler, PacketRecycler}; use request_processor::RequestProcessor; use request_stage::RequestStage; use service::Service; +use std::mem; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread::{self, JoinHandle}; use streamer; pub struct Rpu { request_stage: RequestStage, thread_hdls: Vec>, + request_processor: Arc>, } impl Rpu { @@ -58,9 +60,9 @@ impl Rpu { packet_sender, ); - let request_processor = RequestProcessor::new(bank.clone()); + let request_processor = Arc::new(RwLock::new(RequestProcessor::new(bank.clone()))); let (request_stage, blob_receiver) = RequestStage::new( - request_processor, + request_processor.clone(), packet_receiver, packet_recycler.clone(), blob_recycler.clone(), @@ -74,11 +76,18 @@ impl Rpu { ); let thread_hdls = vec![t_receiver, t_responder]; + Rpu { thread_hdls, request_stage, + request_processor, } } + + pub fn set_new_bank(&self, new_bank: Arc) { + let mut w_request_procesor = self.request_processor.write().unwrap(); + mem::replace(&mut *w_request_procesor, RequestProcessor::new(new_bank)); + } } impl Service for Rpu { diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index d0ec932ea2..d523fb5b2b 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -65,10 +65,15 @@ impl SigVerifyStage { ); let verified_batch = Self::verify_batch(batch, sigverify_disabled); - sendr + + match sendr .lock() .expect("lock in fn verify_batch in tpu") - .send(verified_batch)?; + .send(verified_batch) + { + Err(_) => return Err(Error::SendError), + _ => (), + } let total_time_ms = timing::duration_as_ms(&now.elapsed()); let total_time_s = timing::duration_as_s(&now.elapsed()); @@ -105,6 +110,9 @@ impl SigVerifyStage { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::SendError => { + break; + } _ => error!("{:?}", e), } } diff --git a/src/tpu.rs b/src/tpu.rs index 4dd6555f8f..49d5eaa7c4 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -36,7 +36,7 @@ use service::Service; use signature::Keypair; use sigverify_stage::SigVerifyStage; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; use std::thread; @@ -53,6 +53,7 @@ pub struct Tpu { banking_stage: BankingStage, record_stage: RecordStage, write_stage: WriteStage, + exit: Arc, } impl Tpu { @@ -63,16 +64,16 @@ impl Tpu { tick_duration: Option, transactions_sockets: Vec, blob_recycler: &BlobRecycler, - exit: Arc, ledger_path: &str, sigverify_disabled: bool, entry_height: u64, - ) -> (Self, Receiver>) { + ) -> (Self, Receiver>, Arc) { + let exit = Arc::new(AtomicBool::new(false)); let mut packet_recycler = PacketRecycler::default(); packet_recycler.set_name("tpu::Packet"); let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_sockets, exit, &packet_recycler); + FetchStage::new(transactions_sockets, exit.clone(), &packet_recycler); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); @@ -103,8 +104,13 @@ impl Tpu { banking_stage, record_stage, write_stage, + exit: exit.clone(), }; - (tpu, entry_forwarder) + (tpu, entry_forwarder, exit) + } + + pub fn exit(&self) -> () { + self.exit.store(true, Ordering::Relaxed); } pub fn close(self) -> thread::Result> { diff --git a/tests/multinode.rs b/tests/multinode.rs index 897d354d85..28a434bce8 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -5,9 +5,9 @@ extern crate chrono; extern crate serde_json; extern crate solana; -use solana::crdt::{Crdt, Node, NodeInfo}; +use solana::crdt::{Crdt, Node, NodeInfo, LEADER_ROTATION_INTERVAL}; use solana::entry::Entry; -use solana::fullnode::Fullnode; +use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::hash::Hash; use solana::ledger::LedgerWriter; use solana::logger; @@ -30,28 +30,34 @@ use std::thread::sleep; use std::thread::Builder; use std::time::{Duration, Instant}; -fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { - //lets spy on the network +fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { let exit = Arc::new(AtomicBool::new(false)); let mut spy = Node::new_localhost(); - let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.info.id.clone(); - spy.info.contact_info.tvu = daddr; - spy.info.contact_info.rpu = daddr; + spy.info.contact_info.tvu = spy.sockets.replicate[0].local_addr().unwrap(); + spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap(); let mut spy_crdt = Crdt::new(spy.info).expect("Crdt::new"); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); - let spy_ref = Arc::new(RwLock::new(spy_crdt)); + let spy_crdt_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = Arc::new(RwLock::new(default_window())); let recycler = BlobRecycler::default(); let ncp = Ncp::new( - &spy_ref, + &spy_crdt_ref, spy_window, recycler, None, spy.sockets.gossip, exit.clone(), ); + + (ncp, spy_crdt_ref, me) +} + +fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { + //lets spy on the network + let (ncp, spy_ref, me) = make_spy_node(leader); + //wait for the network to converge let mut converged = false; let mut rv = vec![]; @@ -85,15 +91,16 @@ fn tmp_ledger_path(name: &str) -> String { format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey()) } -fn genesis(name: &str, num: i64) -> (Mint, String) { +fn genesis(name: &str, num: i64) -> (Mint, String, Vec) { let mint = Mint::new(num); let path = tmp_ledger_path(name); let mut writer = LedgerWriter::open(&path, true).unwrap(); - writer.write_entries(mint.create_entries()).unwrap(); + let entries = mint.create_entries(); + writer.write_entries(entries.clone()).unwrap(); - (mint, path) + (mint, path, entries) } fn tmp_copy_ledger(from: &str, name: &str) -> String { @@ -131,7 +138,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); - let (alice, leader_ledger_path) = genesis("multi_node_ledger_window", 10_000); + let (alice, leader_ledger_path, _) = genesis("multi_node_ledger_window", 10_000); ledger_paths.push(leader_ledger_path.clone()); // make a copy at zero @@ -151,7 +158,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // Send leader some tokens to vote let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, 500, None).unwrap(); info!("leader balance {}", leader_balance); // start up another validator from zero, converge and then check @@ -173,7 +180,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // another transaction with leader let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap(); info!("bob balance on leader {}", leader_balance); assert_eq!(leader_balance, 500); @@ -211,7 +218,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); - let (alice, leader_ledger_path) = genesis("multi_node_validator_catchup_from_zero", 10_000); + let (alice, leader_ledger_path, _) = genesis("multi_node_validator_catchup_from_zero", 10_000); ledger_paths.push(leader_ledger_path.clone()); let zero_ledger_path = tmp_copy_ledger( @@ -224,7 +231,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { // Send leader some tokens to vote let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, 500, None).unwrap(); info!("leader balance {}", leader_balance); let mut nodes = vec![server]; @@ -251,7 +258,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { assert_eq!(servers.len(), N + 1); //verify leader can do transfer let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap(); assert_eq!(leader_balance, 500); //verify validator has the same balance let mut success = 0usize; @@ -284,7 +291,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { let servers = converge(&leader_data, N + 2); let mut leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap(); info!("leader balance {}", leader_balance); loop { let mut client = mk_client(&leader_data); @@ -335,13 +342,13 @@ fn test_multi_node_basic() { let bob_pubkey = Keypair::new().pubkey(); let mut ledger_paths = Vec::new(); - let (alice, leader_ledger_path) = genesis("multi_node_basic", 10_000); + let (alice, leader_ledger_path, _) = genesis("multi_node_basic", 10_000); ledger_paths.push(leader_ledger_path.clone()); let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false); // Send leader some tokens to vote let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, 500, None).unwrap(); info!("leader balance {}", leader_balance); let mut nodes = vec![server]; @@ -364,7 +371,7 @@ fn test_multi_node_basic() { assert_eq!(servers.len(), N + 1); //verify leader can do transfer let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, None).unwrap(); assert_eq!(leader_balance, 500); //verify validator has the same balance let mut success = 0usize; @@ -393,17 +400,17 @@ fn test_boot_validator_from_file() -> result::Result<()> { let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); - let (alice, leader_ledger_path) = genesis("boot_validator_from_file", 100_000); + let (alice, leader_ledger_path, _) = genesis("boot_validator_from_file", 100_000); let mut ledger_paths = Vec::new(); ledger_paths.push(leader_ledger_path.clone()); let leader_data = leader.info.clone(); let leader_fullnode = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false); let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap(); assert_eq!(leader_balance, 500); let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(1000)).unwrap(); assert_eq!(leader_balance, 1000); let keypair = Keypair::new(); @@ -446,7 +453,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { // ledger (currently up to WINDOW_SIZE entries) logger::setup(); - let (alice, ledger_path) = genesis( + let (alice, ledger_path, _) = genesis( "leader_restart_validator_start_from_old_ledger", 100_000 + 500 * solana::window_service::MAX_REPAIR_BACKOFF as i64, ); @@ -456,7 +463,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { // lengthen the ledger let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap(); assert_eq!(leader_balance, 500); // create a "stale" ledger by copying current ledger @@ -471,7 +478,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { // lengthen the ledger let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(1000)).unwrap(); assert_eq!(leader_balance, 1000); // restart the leader @@ -498,7 +505,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { let mut client = mk_client(&validator_data); for _ in 0..solana::window_service::MAX_REPAIR_BACKOFF { let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected)) + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(expected)) .unwrap(); assert_eq!(leader_balance, expected); @@ -538,7 +545,7 @@ fn test_multi_node_dynamic_network() { let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); - let (alice, leader_ledger_path) = genesis("multi_node_dynamic_network", 10_000_000); + let (alice, leader_ledger_path, _) = genesis("multi_node_dynamic_network", 10_000_000); let mut ledger_paths = Vec::new(); ledger_paths.push(leader_ledger_path.clone()); @@ -553,6 +560,7 @@ fn test_multi_node_dynamic_network() { &leader_data, &alice_arc.read().unwrap(), &leader_pubkey, + 500, None, ).unwrap(); info!("leader balance {}", leader_balance); @@ -710,6 +718,111 @@ fn test_multi_node_dynamic_network() { } } +#[test] +fn test_leader_to_validator_transition() { + logger::setup(); + + // Make a dummy address to be the sink for this test's mock transactions + let bob_pubkey = Keypair::new().pubkey(); + + // Initialize the leader ledger. Make a mint and a genesis entry + // in the leader ledger + let (mint, leader_ledger_path, entries) = genesis( + "test_leader_to_validator_transition", + (3 * LEADER_ROTATION_INTERVAL) as i64, + ); + + let genesis_height = entries.len() as u64; + let mut ledger_paths = Vec::new(); + ledger_paths.push(leader_ledger_path.clone()); + + // Start the leader node + let leader_keypair = Keypair::new(); + let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader_info = leader_node.info.clone(); + let mut leader = Fullnode::new( + leader_node, + &leader_ledger_path, + leader_keypair, + None, + false, + ); + + // Set the next leader to be Bob + leader.set_scheduled_leader(bob_pubkey, LEADER_ROTATION_INTERVAL); + + // Make an extra node for our leader to broadcast to, + // who won't vote and mess with our leader's entry count + let (ncp, spy_node, me) = make_spy_node(&leader_info); + + // Wait for the leader to see the spy node + let mut converged = false; + for _ in 0..30 { + let num = spy_node.read().unwrap().convergence(); + let mut v: Vec = spy_node + .read() + .unwrap() + .table + .values() + .into_iter() + .filter(|x| x.id != me) + .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) + .cloned() + .collect(); + // There's only one person excluding the spy node (the leader) who should see + // two nodes on the network + if num >= 2 as u64 && v.len() >= 1 { + converged = true; + break; + } + sleep(Duration::new(1, 0)); + } + + assert!(converged); + + let extra_transactions = std::cmp::max(LEADER_ROTATION_INTERVAL / 4, 1); + + // Push leader "extra_transactions" past LEADER_ROTATION_INTERVAL entry height, + // make sure the leader stops. + assert!(genesis_height < LEADER_ROTATION_INTERVAL); + for i in genesis_height..(LEADER_ROTATION_INTERVAL + extra_transactions) { + let expected_balance = std::cmp::min( + LEADER_ROTATION_INTERVAL - genesis_height, + i - genesis_height); + + send_tx_and_retry_get_balance( + &leader_info, + &mint, + &bob_pubkey, + 1, + Some(expected_balance as i64), + ); + } + + // Wait for leader to shut down tpu and restart tvu + match leader.handle_role_transition().unwrap() { + Some(FullnodeReturnType::LeaderRotation) => (), + _ => panic!("Expected reason for exit to be leader rotation"), + } + + // Query now validator to make sure that he has the proper balances in his bank + // after the transition, even though we submitted "extra_transactions" + // transactions earlier + let mut leader_client = mk_client(&leader_info); + + let expected_bal = LEADER_ROTATION_INTERVAL - genesis_height; + let bal = leader_client + .poll_get_balance(&bob_pubkey) + .expect("Expected success when polling newly transitioned validator for balance") + as u64; + + assert_eq!(bal, expected_bal); + + // Shut down + ncp.close().unwrap(); + leader.close().unwrap(); +} + fn mk_client(leader: &NodeInfo) -> ThinClient { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket @@ -751,6 +864,7 @@ fn send_tx_and_retry_get_balance( leader: &NodeInfo, alice: &Mint, bob_pubkey: &Pubkey, + transfer_amount: i64, expected: Option, ) -> Option { let mut client = mk_client(leader); @@ -758,7 +872,7 @@ fn send_tx_and_retry_get_balance( let last_id = client.get_last_id(); info!("executing leader transfer"); let _sig = client - .transfer(500, &alice.keypair(), *bob_pubkey, &last_id) + .transfer(transfer_amount, &alice.keypair(), *bob_pubkey, &last_id) .unwrap(); retry_get_balance(&mut client, bob_pubkey, expected) }