Replace rayon with threads for dynamic network test (#745)

This commit is contained in:
pgarg66 2018-07-24 17:54:29 -07:00 committed by GitHub
parent 6bd18e18ea
commit 9daa7bdbe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 67 additions and 31 deletions

View File

@ -1,11 +1,9 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate bincode; extern crate bincode;
extern crate rayon;
extern crate serde_json; extern crate serde_json;
extern crate solana; extern crate solana;
use rayon::prelude::*;
use solana::crdt::TestNode; use solana::crdt::TestNode;
use solana::crdt::{Crdt, NodeInfo}; use solana::crdt::{Crdt, NodeInfo};
use solana::entry_writer::EntryWriter; use solana::entry_writer::EntryWriter;
@ -22,6 +20,7 @@ use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::thread::Builder;
use std::time::Duration; use std::time::Duration;
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> { fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
@ -375,6 +374,7 @@ fn test_multi_node_dynamic_network() {
let leader = TestNode::new_localhost(); let leader = TestNode::new_localhost();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(100_000); let (alice, ledger_path) = genesis(100_000);
let alice_arc = Arc::new(RwLock::new(alice));
let leader_data = leader.data.clone(); let leader_data = leader.data.clone();
let server = FullNode::new( let server = FullNode::new(
leader, leader,
@ -384,50 +384,86 @@ fn test_multi_node_dynamic_network() {
None, None,
); );
info!("{:x} LEADER", leader_data.debug_id()); info!("{:x} LEADER", leader_data.debug_id());
let leader_balance = let leader_balance = send_tx_and_retry_get_balance(
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); &leader_data,
&alice_arc.read().unwrap(),
&bob_pubkey,
Some(500),
).unwrap();
assert_eq!(leader_balance, 500); assert_eq!(leader_balance, 500);
let leader_balance = let leader_balance = send_tx_and_retry_get_balance(
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); &leader_data,
&alice_arc.read().unwrap(),
&bob_pubkey,
Some(1000),
).unwrap();
assert_eq!(leader_balance, 1000); assert_eq!(leader_balance, 1000);
let keypairs: Vec<(KeyPair)> = (0..N) let t1: Vec<_> = (0..N)
.into_par_iter() .into_iter()
.map(|n| { .map(|n| {
let keypair = KeyPair::new(); let leader_data = leader_data.clone();
//send some tokens to the new validator let alice_clone = alice_arc.clone();
let bal = Builder::new()
send_tx_and_retry_get_balance(&leader_data, &alice, &keypair.pubkey(), Some(500)); .name("keypair-thread".to_string())
assert_eq!(bal, Some(500)); .spawn(move || {
info!("sent balance to[{}/{}] {:x}", n, N, keypair.pubkey()); info!("Spawned thread {}", n);
keypair let keypair = KeyPair::new();
//send some tokens to the new validator
let bal = send_tx_and_retry_get_balance(
&leader_data,
&alice_clone.read().unwrap(),
&keypair.pubkey(),
Some(500),
);
assert_eq!(bal, Some(500));
info!("sent balance to[{}/{}] {:x}", n, N, keypair.pubkey());
keypair
})
.unwrap()
}) })
.collect(); .collect();
let validators: Vec<(NodeInfo, FullNode)> = keypairs info!("Waiting for keypairs to be created");
.into_par_iter() let keypairs: Vec<_> = t1.into_iter().map(|t| t.join().unwrap()).collect();
info!("keypairs created");
let t2: Vec<_> = keypairs
.into_iter()
.map(|keypair| { .map(|keypair| {
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let leader_data = leader_data.clone();
let rd = validator.data.clone(); let ledger_path = ledger_path.clone();
info!("starting {:8x} {:x}", keypair.pubkey(), rd.debug_id()); Builder::new()
let val = FullNode::new( .name("validator-launch-thread".to_string())
validator, .spawn(move || {
false, let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
LedgerFile::Path(ledger_path.clone()), let rd = validator.data.clone();
Some(keypair), info!("starting {:8x} {:x}", keypair.pubkey(), rd.debug_id());
Some(leader_data.contact_info.ncp), let val = FullNode::new(
); validator,
(rd, val) false,
LedgerFile::Path(ledger_path.clone()),
Some(keypair),
Some(leader_data.contact_info.ncp),
);
(rd, val)
})
.unwrap()
}) })
.collect(); .collect();
let validators: Vec<_> = t2.into_iter().map(|t| t.join().unwrap()).collect();
let mut consecutive_success = 0; let mut consecutive_success = 0;
for i in 0..N { for i in 0..N {
//verify leader can do transfer //verify leader can do transfer
let expected = ((i + 3) * 500) as i64; let expected = ((i + 3) * 500) as i64;
let leader_balance = let leader_balance = send_tx_and_retry_get_balance(
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected)) &leader_data,
.unwrap(); &alice_arc.read().unwrap(),
&bob_pubkey,
Some(expected),
).unwrap();
if leader_balance != expected { if leader_balance != expected {
info!( info!(
"leader dropped transaction {} {:?} {:?}", "leader dropped transaction {} {:?} {:?}",