Expose fewer exit variables

This commit is contained in:
Greg Fitzgerald 2018-07-09 14:53:18 -06:00 committed by Greg Fitzgerald
parent 0ee86ff313
commit c65c0d9b23
8 changed files with 89 additions and 110 deletions

View File

@ -14,8 +14,6 @@ use solana::service::Service;
use std::fs::File; use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::process::exit; use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
//use std::time::Duration; //use std::time::Duration;
fn main() -> () { fn main() -> () {
@ -68,11 +66,10 @@ fn main() -> () {
} }
} }
let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr); let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr);
let exit = Arc::new(AtomicBool::new(false));
let fullnode = if let Some(t) = matches.value_of("testnet") { let fullnode = if let Some(t) = matches.value_of("testnet") {
let testnet_address_string = t.to_string(); let testnet_address_string = t.to_string();
let testnet_addr = testnet_address_string.parse().unwrap(); let testnet_addr = testnet_address_string.parse().unwrap();
FullNode::new(node, false, InFile::StdIn, Some(testnet_addr), None, exit) FullNode::new(node, false, InFile::StdIn, Some(testnet_addr), None)
} else { } else {
node.data.current_leader_id = node.data.id.clone(); node.data.current_leader_id = node.data.id.clone();
@ -81,7 +78,7 @@ fn main() -> () {
} else { } else {
OutFile::StdOut OutFile::StdOut
}; };
FullNode::new(node, true, InFile::StdIn, None, Some(outfile), exit) FullNode::new(node, true, InFile::StdIn, None, Some(outfile))
}; };
fullnode.join().expect("join"); fullnode.join().expect("join");
} }

View File

@ -3,13 +3,14 @@
use packet::BlobRecycler; use packet::BlobRecycler;
use service::Service; use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use streamer::{self, BlobReceiver}; use streamer::{self, BlobReceiver};
pub struct BlobFetchStage { pub struct BlobFetchStage {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
@ -39,7 +40,11 @@ impl BlobFetchStage {
}) })
.collect(); .collect();
(BlobFetchStage { thread_hdls }, blob_receiver) (BlobFetchStage { exit, thread_hdls }, blob_receiver)
}
pub fn close(&self) {
self.exit.store(true, Ordering::Relaxed);
} }
} }

View File

@ -3,13 +3,14 @@
use packet::PacketRecycler; use packet::PacketRecycler;
use service::Service; use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use streamer::{self, PacketReceiver}; use streamer::{self, PacketReceiver};
pub struct FetchStage { pub struct FetchStage {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
@ -39,7 +40,11 @@ impl FetchStage {
}) })
.collect(); .collect();
(FetchStage { thread_hdls }, packet_receiver) (FetchStage { exit, thread_hdls }, packet_receiver)
}
pub fn close(&self) {
self.exit.store(true, Ordering::Relaxed);
} }
} }

View File

@ -14,7 +14,7 @@ use std::fs::{File, OpenOptions};
use std::io::{sink, stdin, stdout, BufReader}; use std::io::{sink, stdin, stdout, BufReader};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{JoinHandle, Result}; use std::thread::{JoinHandle, Result};
use std::time::Duration; use std::time::Duration;
@ -24,6 +24,7 @@ use tvu::Tvu;
//use std::time::Duration; //use std::time::Duration;
pub struct FullNode { pub struct FullNode {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
@ -44,7 +45,6 @@ impl FullNode {
infile: InFile, infile: InFile,
network_entry_for_validator: Option<SocketAddr>, network_entry_for_validator: Option<SocketAddr>,
outfile_for_leader: Option<OutFile>, outfile_for_leader: Option<OutFile>,
exit: Arc<AtomicBool>,
) -> FullNode { ) -> FullNode {
info!("creating bank..."); info!("creating bank...");
let bank = Bank::default(); let bank = Bank::default();
@ -70,6 +70,7 @@ impl FullNode {
local_gossip_addr, node.data.contact_info.ncp local_gossip_addr, node.data.contact_info.ncp
); );
let requests_addr = node.data.contact_info.rpu.clone(); let requests_addr = node.data.contact_info.rpu.clone();
let exit = Arc::new(AtomicBool::new(false));
if !leader { if !leader {
let testnet_addr = network_entry_for_validator.expect("validator requires entry"); let testnet_addr = network_entry_for_validator.expect("validator requires entry");
@ -215,7 +216,7 @@ impl FullNode {
); );
thread_hdls.extend(vec![t_broadcast]); thread_hdls.extend(vec![t_broadcast]);
FullNode { thread_hdls } FullNode { exit, thread_hdls }
} }
/// Create a server instance acting as a validator. /// Create a server instance acting as a validator.
@ -294,7 +295,12 @@ impl FullNode {
); );
thread_hdls.extend(tvu.thread_hdls()); thread_hdls.extend(tvu.thread_hdls());
thread_hdls.extend(ncp.thread_hdls()); thread_hdls.extend(ncp.thread_hdls());
FullNode { thread_hdls } FullNode { exit, thread_hdls }
}
pub fn close(self) -> Result<()> {
self.exit.store(true, Ordering::Relaxed);
self.join()
} }
} }
@ -317,7 +323,7 @@ mod tests {
use crdt::TestNode; use crdt::TestNode;
use fullnode::FullNode; use fullnode::FullNode;
use mint::Mint; use mint::Mint;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
#[test] #[test]
fn validator_exit() { fn validator_exit() {
@ -326,10 +332,7 @@ mod tests {
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone(); let entry = tn.data.clone();
let v = FullNode::new_validator(bank, 0, None, tn, entry, exit.clone()); let v = FullNode::new_validator(bank, 0, None, tn, entry, exit);
exit.store(true, Ordering::Relaxed); v.close().unwrap();
for t in v.thread_hdls {
t.join().unwrap();
}
} }
} }

View File

@ -5,13 +5,14 @@ use packet::{BlobRecycler, SharedBlob};
use result::Result; use result::Result;
use service::Service; use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use streamer; use streamer;
pub struct Ncp { pub struct Ncp {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
@ -47,9 +48,14 @@ impl Ncp {
response_sender.clone(), response_sender.clone(),
exit.clone(), exit.clone(),
); );
let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit); let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit.clone());
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Ok(Ncp { thread_hdls }) Ok(Ncp { exit, thread_hdls })
}
pub fn close(self) -> thread::Result<()> {
self.exit.store(true, Ordering::Relaxed);
self.join()
} }
} }
@ -70,7 +76,7 @@ impl Service for Ncp {
mod tests { mod tests {
use crdt::{Crdt, TestNode}; use crdt::{Crdt, TestNode};
use ncp::Ncp; use ncp::Ncp;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
#[test] #[test]
@ -89,9 +95,6 @@ mod tests {
tn.sockets.gossip_send, tn.sockets.gossip_send,
exit.clone(), exit.clone(),
).unwrap(); ).unwrap();
exit.store(true, Ordering::Relaxed); d.close().expect("thread join");
for t in d.thread_hdls {
t.join().expect("thread join");
}
} }
} }

View File

@ -87,6 +87,11 @@ impl Tpu {
}; };
(tpu, blob_receiver) (tpu, blob_receiver)
} }
pub fn close(self) -> thread::Result<()> {
self.fetch_stage.close();
self.join()
}
} }
impl Service for Tpu { impl Service for Tpu {

View File

@ -101,6 +101,11 @@ impl Tvu {
window_stage, window_stage,
} }
} }
pub fn close(self) -> thread::Result<()> {
self.fetch_stage.close();
self.join()
}
} }
impl Service for Tvu { impl Service for Tvu {
@ -136,7 +141,7 @@ pub mod tests {
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
@ -279,8 +284,7 @@ pub mod tests {
let bob_balance = bank.get_balance(&bob_keypair.pubkey()); let bob_balance = bank.get_balance(&bob_keypair.pubkey());
assert_eq!(bob_balance, starting_balance - alice_ref_balance); assert_eq!(bob_balance, starting_balance - alice_ref_balance);
exit.store(true, Ordering::Relaxed); tvu.close().expect("close");
tvu.join().expect("join");
dr_l.0.join().expect("join"); dr_l.0.join().expect("join");
dr_2.0.join().expect("join"); dr_2.0.join().expect("join");
dr_1.0.join().expect("join"); dr_1.0.join().expect("join");

View File

@ -11,14 +11,13 @@ use solana::fullnode::{FullNode, InFile, OutFile};
use solana::logger; use solana::logger;
use solana::mint::Mint; use solana::mint::Mint;
use solana::ncp::Ncp; use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::streamer::default_window; use solana::streamer::default_window;
use solana::thin_client::ThinClient; use solana::thin_client::ThinClient;
use std::fs::File; use std::fs::File;
use std::mem; use std::mem;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
@ -36,7 +35,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec<ReplicatedData> {
spy_crdt.set_leader(leader.id); spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window(); let spy_window = default_window();
let dr = Ncp::new( let ncp = Ncp::new(
spy_ref.clone(), spy_ref.clone(),
spy_window, spy_window,
spy.sockets.gossip, spy.sockets.gossip,
@ -66,8 +65,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec<ReplicatedData> {
sleep(Duration::new(1, 0)); sleep(Duration::new(1, 0));
} }
assert!(converged); assert!(converged);
exit.store(true, Ordering::Relaxed); ncp.close().unwrap();
dr.join().unwrap();
rv rv
} }
@ -92,18 +90,10 @@ fn test_multi_node_validator_catchup_from_zero() {
let leader = TestNode::new(); let leader = TestNode::new();
let leader_data = leader.data.clone(); let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (alice, ledger_path) = genesis(10_000); let (alice, ledger_path) = genesis(10_000);
let server = FullNode::new( let server = FullNode::new(leader, true, InFile::Path(ledger_path.clone()), None, None);
leader, let mut nodes = vec![server];
true,
InFile::Path(ledger_path.clone()),
None,
None,
exit.clone(),
);
let mut threads = server.thread_hdls();
for _ in 0..N { for _ in 0..N {
let validator = TestNode::new(); let validator = TestNode::new();
let mut val = FullNode::new( let mut val = FullNode::new(
@ -112,9 +102,8 @@ fn test_multi_node_validator_catchup_from_zero() {
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
None, None,
exit.clone(),
); );
threads.append(&mut val.thread_hdls()); nodes.push(val);
} }
let servers = converge(&leader_data, N + 1); let servers = converge(&leader_data, N + 1);
//contains the leader addr as well //contains the leader addr as well
@ -145,9 +134,8 @@ fn test_multi_node_validator_catchup_from_zero() {
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
None, None,
exit.clone(),
); );
threads.append(&mut val.thread_hdls()); nodes.push(val);
//contains the leader and new node //contains the leader and new node
let servers = converge(&leader_data, N + 2); let servers = converge(&leader_data, N + 2);
@ -180,9 +168,8 @@ fn test_multi_node_validator_catchup_from_zero() {
} }
assert_eq!(success, servers.len()); assert_eq!(success, servers.len());
exit.store(true, Ordering::Relaxed); for node in nodes {
for t in threads { node.close().unwrap();
t.join().unwrap();
} }
} }
@ -194,27 +181,19 @@ fn test_multi_node_basic() {
let leader = TestNode::new(); let leader = TestNode::new();
let leader_data = leader.data.clone(); let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (alice, ledger_path) = genesis(10_000); let (alice, ledger_path) = genesis(10_000);
let server = FullNode::new( let server = FullNode::new(leader, true, InFile::Path(ledger_path.clone()), None, None);
leader, let mut nodes = vec![server];
true,
InFile::Path(ledger_path.clone()),
None,
None,
exit.clone(),
);
let threads = server.thread_hdls();
for _ in 0..N { for _ in 0..N {
let validator = TestNode::new(); let validator = TestNode::new();
FullNode::new( let val = FullNode::new(
validator, validator,
false, false,
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
None, None,
exit.clone(),
); );
nodes.push(val);
} }
let servers = converge(&leader_data, N + 1); let servers = converge(&leader_data, N + 1);
//contains the leader addr as well //contains the leader addr as well
@ -236,9 +215,8 @@ fn test_multi_node_basic() {
} }
assert_eq!(success, servers.len()); assert_eq!(success, servers.len());
exit.store(true, Ordering::Relaxed); for node in nodes {
for t in threads { node.close().unwrap();
t.join().unwrap();
} }
std::fs::remove_file(ledger_path).unwrap(); std::fs::remove_file(ledger_path).unwrap();
} }
@ -248,7 +226,6 @@ fn test_boot_validator_from_file() {
logger::setup(); logger::setup();
let leader = TestNode::new(); let leader = TestNode::new();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (alice, ledger_path) = genesis(100_000); let (alice, ledger_path) = genesis(100_000);
let leader_data = leader.data.clone(); let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new( let leader_fullnode = FullNode::new(
@ -257,7 +234,6 @@ fn test_boot_validator_from_file() {
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
None, None,
Some(OutFile::Path(ledger_path.clone())), Some(OutFile::Path(ledger_path.clone())),
exit.clone(),
); );
let leader_balance = let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
@ -274,42 +250,36 @@ fn test_boot_validator_from_file() {
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
None, None,
exit.clone(),
); );
let mut client = mk_client(&validator_data); let mut client = mk_client(&validator_data);
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
assert!(getbal == Some(leader_balance)); assert!(getbal == Some(leader_balance));
exit.store(true, Ordering::Relaxed); leader_fullnode.close().unwrap();
leader_fullnode.join().unwrap(); val_fullnode.close().unwrap();
val_fullnode.join().unwrap();
std::fs::remove_file(ledger_path).unwrap(); std::fs::remove_file(ledger_path).unwrap();
} }
fn restart_leader( fn restart_leader(
exit: Option<Arc<AtomicBool>>,
leader_fullnode: Option<FullNode>, leader_fullnode: Option<FullNode>,
ledger_path: String, ledger_path: String,
) -> (ReplicatedData, FullNode, Arc<AtomicBool>) { ) -> (ReplicatedData, FullNode) {
if let (Some(exit), Some(leader_fullnode)) = (exit, leader_fullnode) { if let Some(leader_fullnode) = leader_fullnode {
// stop the leader // stop the leader
exit.store(true, Ordering::Relaxed); leader_fullnode.close().unwrap();
leader_fullnode.join().unwrap();
} }
let leader = TestNode::new(); let leader = TestNode::new();
let leader_data = leader.data.clone(); let leader_data = leader.data.clone();
let exit = Arc::new(AtomicBool::new(false));
let leader_fullnode = FullNode::new( let leader_fullnode = FullNode::new(
leader, leader,
true, true,
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
None, None,
Some(OutFile::Path(ledger_path.clone())), Some(OutFile::Path(ledger_path.clone())),
exit.clone(),
); );
(leader_data, leader_fullnode, exit) (leader_data, leader_fullnode)
} }
#[test] #[test]
@ -322,7 +292,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
let (alice, ledger_path) = genesis(100_000); let (alice, ledger_path) = genesis(100_000);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let (leader_data, leader_fullnode, exit) = restart_leader(None, None, ledger_path.clone()); let (leader_data, leader_fullnode) = restart_leader(None, ledger_path.clone());
// lengthen the ledger // lengthen the ledger
let leader_balance = let leader_balance =
@ -337,8 +307,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
.expect(format!("copy {} to {}", &ledger_path, &stale_ledger_path,).as_str()); .expect(format!("copy {} to {}", &ledger_path, &stale_ledger_path,).as_str());
// restart the leader // restart the leader
let (leader_data, leader_fullnode, exit) = let (leader_data, leader_fullnode) = restart_leader(Some(leader_fullnode), ledger_path.clone());
restart_leader(Some(exit), Some(leader_fullnode), ledger_path.clone());
// lengthen the ledger // lengthen the ledger
let leader_balance = let leader_balance =
@ -346,8 +315,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
assert_eq!(leader_balance, 1000); assert_eq!(leader_balance, 1000);
// restart the leader // restart the leader
let (leader_data, leader_fullnode, exit) = let (leader_data, leader_fullnode) = restart_leader(Some(leader_fullnode), ledger_path.clone());
restart_leader(Some(exit), Some(leader_fullnode), ledger_path.clone());
// start validator from old ledger // start validator from old ledger
let validator = TestNode::new(); let validator = TestNode::new();
@ -358,7 +326,6 @@ fn test_leader_restart_validator_start_from_old_ledger() {
InFile::Path(stale_ledger_path.clone()), InFile::Path(stale_ledger_path.clone()),
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
None, None,
exit.clone(),
); );
// trigger broadcast, validator should catch up from leader, whose window contains // trigger broadcast, validator should catch up from leader, whose window contains
@ -380,14 +347,13 @@ fn test_leader_restart_validator_start_from_old_ledger() {
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected)); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected));
assert_eq!(getbal, Some(expected)); assert_eq!(getbal, Some(expected));
exit.store(true, Ordering::Relaxed); leader_fullnode.close().unwrap();
leader_fullnode.join().unwrap(); val_fullnode.close().unwrap();
val_fullnode.join().unwrap();
std::fs::remove_file(ledger_path).unwrap(); std::fs::remove_file(ledger_path).unwrap();
std::fs::remove_file(stale_ledger_path).unwrap(); std::fs::remove_file(stale_ledger_path).unwrap();
} }
//TODO: this test will run a long time so its disabled for CI //TODO: this test will run a long time so it's disabled for CI
#[test] #[test]
#[ignore] #[ignore]
fn test_multi_node_dynamic_network() { fn test_multi_node_dynamic_network() {
@ -395,7 +361,6 @@ fn test_multi_node_dynamic_network() {
const N: usize = 25; const N: usize = 25;
let leader = TestNode::new(); let leader = TestNode::new();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (alice, ledger_path) = genesis(100_000); let (alice, ledger_path) = genesis(100_000);
let leader_data = leader.data.clone(); let leader_data = leader.data.clone();
let server = FullNode::new( let server = FullNode::new(
@ -404,10 +369,8 @@ fn test_multi_node_dynamic_network() {
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
None, None,
Some(OutFile::Path(ledger_path.clone())), Some(OutFile::Path(ledger_path.clone())),
exit.clone(),
); );
info!("{:x} LEADER", leader_data.debug_id()); info!("{:x} LEADER", leader_data.debug_id());
let threads = server.thread_hdls();
let leader_balance = let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
assert_eq!(leader_balance, 500); assert_eq!(leader_balance, 500);
@ -415,10 +378,9 @@ fn test_multi_node_dynamic_network() {
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap();
assert_eq!(leader_balance, 1000); assert_eq!(leader_balance, 1000);
let mut validators: Vec<(ReplicatedData, Arc<AtomicBool>, FullNode)> = (0..N) let mut validators: Vec<(ReplicatedData, FullNode)> = (0..N)
.into_iter() .into_iter()
.map(|_| { .map(|_| {
let exit = Arc::new(AtomicBool::new(false));
let validator = TestNode::new(); let validator = TestNode::new();
let rd = validator.data.clone(); let rd = validator.data.clone();
let val = FullNode::new( let val = FullNode::new(
@ -427,12 +389,12 @@ fn test_multi_node_dynamic_network() {
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
Some(OutFile::Path(ledger_path.clone())), Some(OutFile::Path(ledger_path.clone())),
exit.clone(),
); );
info!("{:x} VALIDATOR", rd.debug_id()); info!("{:x} VALIDATOR", rd.debug_id());
(rd, exit, val) (rd, val)
}) })
.collect(); .collect();
for i in 0..N { for i in 0..N {
//verify leader can do transfer //verify leader can do transfer
let expected = ((i + 3) * 500) as i64; let expected = ((i + 3) * 500) as i64;
@ -475,7 +437,6 @@ fn test_multi_node_dynamic_network() {
} }
let val = { let val = {
let exit = Arc::new(AtomicBool::new(false));
let validator = TestNode::new(); let validator = TestNode::new();
let rd = validator.data.clone(); let rd = validator.data.clone();
let val = FullNode::new( let val = FullNode::new(
@ -484,10 +445,9 @@ fn test_multi_node_dynamic_network() {
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.contact_info.ncp), Some(leader_data.contact_info.ncp),
Some(OutFile::Path(ledger_path.clone())), Some(OutFile::Path(ledger_path.clone())),
exit.clone(),
); );
info!("{:x} ADDED", rd.debug_id()); info!("{:x} ADDED", rd.debug_id());
(rd, exit, val) (rd, val)
}; };
let old_val = mem::replace(&mut validators[i], val); let old_val = mem::replace(&mut validators[i], val);
@ -495,19 +455,16 @@ fn test_multi_node_dynamic_network() {
// this should be almost true, or at least validators.len() - 1 while the other node catches up // this should be almost true, or at least validators.len() - 1 while the other node catches up
//assert!(success == validators.len()); //assert!(success == validators.len());
//kill a validator //kill a validator
old_val.1.store(true, Ordering::Relaxed); old_val.1.close().unwrap();
old_val.2.join().unwrap();
info!("{:x} KILLED", old_val.0.debug_id()); info!("{:x} KILLED", old_val.0.debug_id());
//add a new one //add a new one
} }
for (_, exit, val) in validators.into_iter() {
exit.store(true, Ordering::Relaxed); for (_, node) in validators {
val.join().unwrap(); node.close().unwrap();
}
exit.store(true, Ordering::Relaxed);
for t in threads {
t.join().unwrap();
} }
server.close().unwrap();
std::fs::remove_file(ledger_path).unwrap(); std::fs::remove_file(ledger_path).unwrap();
} }