From 98b47d2540e0254147f924d6d6c21ab2a22b67a3 Mon Sep 17 00:00:00 2001 From: Carl Date: Wed, 12 Sep 2018 13:59:19 -0700 Subject: [PATCH] Added check in broadcast stage to exit after transmitting last blob before leader rotation. Also added tests --- src/bin/fullnode.rs | 15 ++++++++++++++- src/broadcast_stage.rs | 18 ++++++++++++++++-- src/fullnode.rs | 26 ++++++++++++++------------ src/ledger.rs | 15 +++++---------- src/write_stage.rs | 21 +-------------------- 5 files changed, 50 insertions(+), 45 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 0e1de91b6..b6dd80820 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -124,5 +124,18 @@ fn main() -> () { } } - fullnode.join().expect("to never happen"); + /*loop { + match fullnode.node_role { + NodeRole::Leader(leader_services) => { + // TODO: return an exit code that signals we should do a role switch + leader_services.join(); + //fullnode.start_tvu(); + }, + NodeRole::Validator(validator_services) => { + validator_services.join(); + } + } + }*/ + + let _ = fullnode.join(); } diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 9310e2798..adb213703 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -1,7 +1,7 @@ //! The `broadcast_stage` broadcasts data from a leader node to validators //! use counter::Counter; -use crdt::{Crdt, CrdtError, NodeInfo}; +use crdt::{Crdt, CrdtError, NodeInfo, LEADER_ROTATION_INTERVAL}; use entry::Entry; #[cfg(feature = "erasure")] use erasure; @@ -162,6 +162,19 @@ impl BroadcastStage { let mut receive_index = entry_height; let me = crdt.read().unwrap().my_data().clone(); loop { + if transmit_index.data % (LEADER_ROTATION_INTERVAL as u64) == 0 { + let rcrdt = crdt.read().unwrap(); + let my_id = rcrdt.my_data().id; + match rcrdt.get_scheduled_leader(transmit_index.data) { + Some(id) if id == my_id => (), + // If the leader stays in power for the next + // round as well, then we don't exit. Otherwise, exit. + _ => { + return; + } + } + } + let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( &me, @@ -207,7 +220,8 @@ impl BroadcastStage { .name("solana-broadcaster".to_string()) .spawn(move || { Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver); - }).unwrap(); + }) + .unwrap(); BroadcastStage { thread_hdl } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 5d7d0d782..e8aa6b143 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -39,7 +39,7 @@ impl LeaderServices { } } - fn join(self) -> Result<()> { + pub fn join(self) -> Result<()> { self.tpu.join()?; self.broadcast_stage.join() } @@ -54,7 +54,7 @@ impl ValidatorServices { ValidatorServices { tvu } } - fn join(self) -> Result<()> { + pub fn join(self) -> Result<()> { self.tvu.join() } } @@ -68,7 +68,7 @@ pub struct Fullnode { rpu: Rpu, rpc_service: JsonRpcService, ncp: Ncp, - pub node_role: NodeRole, + pub node_role: Option, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -140,9 +140,9 @@ impl Fullnode { match leader_addr { Some(leader_addr) => { info!( - "validator ready... local request address: {} (advertising {}) connected to: {}", - local_requests_addr, requests_addr, leader_addr - ); + "validator ready... local request address: {} (advertising {}) connected to: {}", + local_requests_addr, requests_addr, leader_addr + ); } None => { info!( @@ -278,7 +278,7 @@ impl Fullnode { exit.clone(), ); let validator_state = ValidatorServices::new(tvu); - node_role = NodeRole::Validator(validator_state); + node_role = Some(NodeRole::Validator(validator_state)); } None => { // Start in leader mode. @@ -309,7 +309,7 @@ impl Fullnode { entry_receiver, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); - node_role = NodeRole::Leader(leader_state); + node_role = Some(NodeRole::Leader(leader_state)); } } @@ -340,13 +340,15 @@ impl Service for Fullnode { self.rpu.join()?; self.ncp.join()?; self.rpc_service.join()?; + match self.node_role { - NodeRole::Validator(validator_service) => { + Some(NodeRole::Validator(validator_service)) => { validator_service.join()?; - } - NodeRole::Leader(leader_service) => { + }, + Some(NodeRole::Leader(leader_service)) => { leader_service.join()?; - } + }, + _ => (), } // TODO: Case on join values above to determine if diff --git a/src/ledger.rs b/src/ledger.rs index ef308c0f0..321c961ff 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -12,9 +12,9 @@ use mint::Mint; use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; -use signature::Pubkey; #[cfg(test)] use signature::{Keypair, KeypairUtil}; +use signature::Pubkey; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; @@ -449,7 +449,7 @@ impl Block for [Entry] { .flat_map(|entry| entry.transactions.iter().filter_map(Transaction::vote)) .collect() } -} +} pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result> { let mut entries: Vec = Vec::with_capacity(blobs.len()); @@ -549,14 +549,17 @@ pub fn next_entries( #[cfg(test)] pub fn tmp_ledger_path(name: &str) -> String { let keypair = Keypair::new(); + format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey()) } + #[cfg(test)] pub fn genesis(name: &str, num: i64) -> (Mint, String) { let mint = Mint::new(num); let path = tmp_ledger_path(name); let mut writer = LedgerWriter::open(&path, true).unwrap(); writer.write_entries(mint.create_entries()).unwrap(); + (mint, path) } @@ -574,14 +577,6 @@ mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use transaction::Transaction; - fn tmp_ledger_path(name: &str) -> String { - use std::env; - let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); - let keypair = Keypair::new(); - - format!("{}/tmp-ledger-{}-{}", out_dir, name, keypair.pubkey()) - } - #[test] fn test_verify_slice() { use logger; diff --git a/src/write_stage.rs b/src/write_stage.rs index 8fa7dc93d..bc10d4521 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -290,11 +290,8 @@ mod tests { use service::Service; use signature::{Keypair, KeypairUtil, Pubkey}; use std::fs::remove_dir_all; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; use write_stage::{WriteStage, WriteStageReturnType}; fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec) { @@ -310,7 +307,6 @@ mod tests { fn setup_dummy_write_stage() -> ( Pubkey, WriteStage, - Arc, Sender>, Receiver>, Arc>, @@ -347,11 +343,9 @@ mod tests { entry_height, ); - let exit_sender = Arc::new(AtomicBool::new(false)); ( id, write_stage, - exit_sender, entry_sender, write_stage_entry_receiver, crdt, @@ -366,7 +360,6 @@ mod tests { let ( id, write_stage, - exit_sender, entry_sender, _write_stage_entry_receiver, crdt, @@ -392,16 +385,6 @@ mod tests { entry_sender.send(new_entry).unwrap(); } - // Wait until at least LEADER_ROTATION_INTERVAL have been written to the ledger - loop { - sleep(Duration::from_secs(1)); - let (current_entry_height, _) = process_ledger(&leader_ledger_path, &bank); - - if current_entry_height == LEADER_ROTATION_INTERVAL { - break; - } - } - // Set the scheduled next leader in the crdt to some other node let leader2_keypair = Keypair::new(); let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey()); @@ -415,14 +398,12 @@ mod tests { // Input another LEADER_ROTATION_INTERVAL dummy entries one at a time, // which will take us past the point of the leader rotation. // The write_stage will see that it's no longer the leader after - // checking the crdt, and exit + // checking the schedule, and exit for _ in 0..LEADER_ROTATION_INTERVAL { let new_entry = recorder.record(vec![]); entry_sender.send(new_entry).unwrap(); } - // Make sure the threads closed cleanly - exit_sender.store(true, Ordering::Relaxed); assert_eq!( write_stage.join().unwrap(), WriteStageReturnType::LeaderRotation