From 9daa7bdbe27060ab8df417a11eb50b11c8744770 Mon Sep 17 00:00:00 2001 From: pgarg66 Date: Tue, 24 Jul 2018 17:54:29 -0700 Subject: [PATCH] Replace rayon with threads for dynamic network test (#745) --- tests/multinode.rs | 98 +++++++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 31 deletions(-) diff --git a/tests/multinode.rs b/tests/multinode.rs index 98786b52ed..9b22233a56 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1,11 +1,9 @@ #[macro_use] extern crate log; extern crate bincode; -extern crate rayon; extern crate serde_json; extern crate solana; -use rayon::prelude::*; use solana::crdt::TestNode; use solana::crdt::{Crdt, NodeInfo}; use solana::entry_writer::EntryWriter; @@ -22,6 +20,7 @@ use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::thread::sleep; +use std::thread::Builder; use std::time::Duration; fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { @@ -375,6 +374,7 @@ fn test_multi_node_dynamic_network() { let leader = TestNode::new_localhost(); let bob_pubkey = KeyPair::new().pubkey(); let (alice, ledger_path) = genesis(100_000); + let alice_arc = Arc::new(RwLock::new(alice)); let leader_data = leader.data.clone(); let server = FullNode::new( leader, @@ -384,50 +384,86 @@ fn test_multi_node_dynamic_network() { None, ); info!("{:x} LEADER", leader_data.debug_id()); - let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); + let leader_balance = send_tx_and_retry_get_balance( + &leader_data, + &alice_arc.read().unwrap(), + &bob_pubkey, + Some(500), + ).unwrap(); assert_eq!(leader_balance, 500); - let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); + let leader_balance = send_tx_and_retry_get_balance( + &leader_data, + &alice_arc.read().unwrap(), + &bob_pubkey, + Some(1000), + ).unwrap(); assert_eq!(leader_balance, 1000); - let keypairs: Vec<(KeyPair)> = (0..N) - .into_par_iter() + let t1: Vec<_> = (0..N) + .into_iter() .map(|n| { - let keypair = KeyPair::new(); - //send some tokens to the new validator - let bal = - send_tx_and_retry_get_balance(&leader_data, &alice, &keypair.pubkey(), Some(500)); - assert_eq!(bal, Some(500)); - info!("sent balance to[{}/{}] {:x}", n, N, keypair.pubkey()); - keypair + let leader_data = leader_data.clone(); + let alice_clone = alice_arc.clone(); + Builder::new() + .name("keypair-thread".to_string()) + .spawn(move || { + info!("Spawned thread {}", n); + let keypair = KeyPair::new(); + //send some tokens to the new validator + let bal = send_tx_and_retry_get_balance( + &leader_data, + &alice_clone.read().unwrap(), + &keypair.pubkey(), + Some(500), + ); + assert_eq!(bal, Some(500)); + info!("sent balance to[{}/{}] {:x}", n, N, keypair.pubkey()); + keypair + }) + .unwrap() }) .collect(); - let validators: Vec<(NodeInfo, FullNode)> = keypairs - .into_par_iter() + info!("Waiting for keypairs to be created"); + let keypairs: Vec<_> = t1.into_iter().map(|t| t.join().unwrap()).collect(); + info!("keypairs created"); + + let t2: Vec<_> = keypairs + .into_iter() .map(|keypair| { - let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); - let rd = validator.data.clone(); - info!("starting {:8x} {:x}", keypair.pubkey(), rd.debug_id()); - let val = FullNode::new( - validator, - false, - LedgerFile::Path(ledger_path.clone()), - Some(keypair), - Some(leader_data.contact_info.ncp), - ); - (rd, val) + let leader_data = leader_data.clone(); + let ledger_path = ledger_path.clone(); + Builder::new() + .name("validator-launch-thread".to_string()) + .spawn(move || { + let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let rd = validator.data.clone(); + info!("starting {:8x} {:x}", keypair.pubkey(), rd.debug_id()); + let val = FullNode::new( + validator, + false, + LedgerFile::Path(ledger_path.clone()), + Some(keypair), + Some(leader_data.contact_info.ncp), + ); + (rd, val) + }) + .unwrap() }) .collect(); + let validators: Vec<_> = t2.into_iter().map(|t| t.join().unwrap()).collect(); + let mut consecutive_success = 0; for i in 0..N { //verify leader can do transfer let expected = ((i + 3) * 500) as i64; - let leader_balance = - send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected)) - .unwrap(); + let leader_balance = send_tx_and_retry_get_balance( + &leader_data, + &alice_arc.read().unwrap(), + &bob_pubkey, + Some(expected), + ).unwrap(); if leader_balance != expected { info!( "leader dropped transaction {} {:?} {:?}",