From dac9775de05d334694994b5c79c150e4009389bf Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 16:54:03 -0600 Subject: [PATCH] Replace client-demo with multinode-demo --- Cargo.toml | 4 - src/bin/client-demo.rs | 209 +++++++++++++++++++++--------- src/bin/multinode-demo.rs | 264 -------------------------------------- 3 files changed, 149 insertions(+), 328 deletions(-) delete mode 100644 src/bin/multinode-demo.rs diff --git a/Cargo.toml b/Cargo.toml index 35a7f7587..efc2e2155 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,6 @@ license = "Apache-2.0" name = "solana-client-demo" path = "src/bin/client-demo.rs" -[[bin]] -name = "solana-multinode-demo" -path = "src/bin/multinode-demo.rs" - [[bin]] name = "solana-testnode" path = "src/bin/testnode.rs" diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 9d7ef41f6..78c62f8ea 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -9,14 +9,20 @@ use futures::Future; use getopts::Options; use isatty::stdin_isatty; use rayon::prelude::*; +use solana::crdt::{Crdt, ReplicatedData}; use solana::mint::MintDemo; -use solana::signature::{GenKeys, KeyPairUtil}; +use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; +use solana::streamer::default_window; use solana::thin_client::ThinClient; use solana::transaction::Transaction; use std::env; +use std::fs::File; use std::io::{stdin, Read}; use std::net::{SocketAddr, UdpSocket}; use std::process::exit; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::JoinHandle; use std::thread::sleep; use std::time::Duration; use std::time::Instant; @@ -32,13 +38,19 @@ fn print_usage(program: &str, opts: Options) { fn main() { let mut threads = 4usize; - let mut server_addr: String = "127.0.0.1:8000".to_string(); - let mut requests_addr: String = "127.0.0.1:8010".to_string(); + let mut num_nodes = 10usize; + let mut leader = "leader.json".to_string(); let mut opts = Options::new(); - opts.optopt("s", "", "server address", "host:port"); + opts.optopt("l", "", "leader", "leader.json"); opts.optopt("c", "", "client address", "host:port"); opts.optopt("t", "", "number of threads", &format!("{}", threads)); + opts.optopt( + "n", + "", + "number of nodes to converge to", + &format!("{}", num_nodes), + ); opts.optflag("h", "help", "print help"); let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { @@ -54,19 +66,32 @@ fn main() { print_usage(&program, opts); return; } - if matches.opt_present("s") { - server_addr = matches.opt_str("s").unwrap(); - } - if matches.opt_present("c") { - requests_addr = matches.opt_str("c").unwrap(); + if matches.opt_present("l") { + leader = matches.opt_str("l").unwrap(); } + let client_addr: Arc> = if matches.opt_present("c") { + let addr = matches.opt_str("c").unwrap().parse().unwrap(); + Arc::new(RwLock::new(addr)) + } else { + Arc::new(RwLock::new("127.0.0.1:8010".parse().unwrap())) + }; if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } + if matches.opt_present("n") { + num_nodes = matches.opt_str("n").unwrap().parse().expect("integer"); + } - let mut transactions_addr: SocketAddr = requests_addr.parse().unwrap(); - let requests_port = transactions_addr.port(); - transactions_addr.set_port(requests_port + 1); + let leader: ReplicatedData = read_leader(leader); + let signal = Arc::new(AtomicBool::new(false)); + let mut c_threads = vec![]; + let validators = converge( + &client_addr, + &leader, + signal.clone(), + num_nodes + 2, + &mut c_threads, + ); if stdin_isatty() { eprintln!("nothing found on stdin, expected a json file"); @@ -85,23 +110,7 @@ fn main() { eprintln!("failed to parse json: {}", e); exit(1); }); - - println!("Binding to {}", requests_addr); - let requests_socket = UdpSocket::bind(&requests_addr).unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(5, 0))) - .unwrap(); - let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap(); - let requests_addr: SocketAddr = server_addr.parse().unwrap(); - let requests_port = requests_addr.port(); - let mut transactions_addr = requests_addr.clone(); - transactions_addr.set_port(requests_port + 3); - let mut client = ThinClient::new( - requests_addr, - requests_socket, - transactions_addr, - transactions_socket, - ); + let mut client = mk_client(&client_addr, &leader); println!("Get last ID..."); let last_id = client.get_last_id().wait().unwrap(); @@ -120,7 +129,7 @@ fn main() { .into_par_iter() .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) .collect(); - let mut duration = now.elapsed(); + let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let bsps = txs as f64 / ns as f64; let nsps = ns as f64 / txs as f64; @@ -130,46 +139,126 @@ fn main() { nsps / 1_000_f64 ); - let initial_tx_count = client.transaction_count(); - println!("initial count {}", initial_tx_count); + let first_count = client.transaction_count(); + println!("initial count {}", first_count); println!("Transfering {} transactions in {} batches", txs, threads); - let now = Instant::now(); let sz = transactions.len() / threads; let chunks: Vec<_> = transactions.chunks(sz).collect(); chunks.into_par_iter().for_each(|txs| { println!("Transferring 1 unit {} times... to", txs.len()); - let requests_addr: SocketAddr = server_addr.parse().unwrap(); - let mut requests_cb_addr = requests_addr.clone(); - requests_cb_addr.set_port(0); - let requests_socket = UdpSocket::bind(requests_cb_addr).unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(5, 0))) - .unwrap(); - let mut transactions_addr: SocketAddr = requests_addr.clone(); - transactions_addr.set_port(0); - let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap(); - let client = ThinClient::new( - requests_addr, - requests_socket, - transactions_addr, - transactions_socket, - ); + let client = mk_client(&client_addr, &leader); for tx in txs { client.transfer_signed(tx.clone()).unwrap(); } }); - println!("Waiting for transactions to complete...",); - let mut tx_count; - for _ in 0..10 { - tx_count = client.transaction_count(); - duration = now.elapsed(); - let txs = tx_count - initial_tx_count; - println!("Transactions processed {}", txs); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let tps = (txs * 1_000_000_000) as f64 / ns as f64; - println!("{} tps", tps); - sleep(Duration::new(1, 0)); + println!("Sampling tps every second...",); + validators.into_par_iter().for_each(|val| { + let mut client = mk_client(&client_addr, &val); + let mut now = Instant::now(); + let mut initial_tx_count = client.transaction_count(); + for i in 0..100 { + let tx_count = client.transaction_count(); + let duration = now.elapsed(); + now = Instant::now(); + let sample = tx_count - initial_tx_count; + initial_tx_count = tx_count; + println!( + "{}: Transactions processed {}", + val.transactions_addr, sample + ); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let tps = (sample * 1_000_000_000) as f64 / ns as f64; + println!("{}: {} tps", val.transactions_addr, tps); + let total = tx_count - first_count; + println!( + "{}: Total Transactions processed {}", + val.transactions_addr, total + ); + if total == transactions.len() as u64 { + break; + } + if i > 20 && sample == 0 { + break; + } + sleep(Duration::new(1, 0)); + } + }); + signal.store(true, Ordering::Relaxed); + for t in c_threads { + t.join().unwrap(); } } + +fn mk_client(locked_addr: &Arc>, r: &ReplicatedData) -> ThinClient { + let mut addr = locked_addr.write().unwrap(); + let port = addr.port(); + let transactions_socket = UdpSocket::bind(addr.clone()).unwrap(); + addr.set_port(port + 1); + let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); + addr.set_port(port + 2); + ThinClient::new( + r.requests_addr, + requests_socket, + r.transactions_addr, + transactions_socket, + ) +} + +fn spy_node(client_addr: &Arc>) -> (ReplicatedData, UdpSocket) { + let mut addr = client_addr.write().unwrap(); + let port = addr.port(); + let gossip = UdpSocket::bind(addr.clone()).unwrap(); + addr.set_port(port + 1); + let daddr = "0.0.0.0:0".parse().unwrap(); + let pubkey = KeyPair::new().pubkey(); + let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr); + (node, gossip) +} + +fn converge( + client_addr: &Arc>, + leader: &ReplicatedData, + exit: Arc, + num_nodes: usize, + threads: &mut Vec>, +) -> Vec { + //lets spy on the network + let daddr = "0.0.0.0:0".parse().unwrap(); + let (spy, spy_gossip) = spy_node(client_addr); + let mut spy_crdt = Crdt::new(spy); + spy_crdt.insert(&leader); + spy_crdt.set_leader(leader.id); + + let spy_ref = Arc::new(RwLock::new(spy_crdt)); + let spy_window = default_window(); + let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); + let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); + //wait for the network to converge + for _ in 0..30 { + let min = spy_ref.read().unwrap().convergence(); + if num_nodes as u64 == min { + println!("converged!"); + break; + } + sleep(Duration::new(1, 0)); + } + threads.push(t_spy_listen); + threads.push(t_spy_gossip); + let v: Vec = spy_ref + .read() + .unwrap() + .table + .values() + .into_iter() + .filter(|x| x.requests_addr != daddr) + .map(|x| x.clone()) + .collect(); + v.clone() +} + +fn read_leader(path: String) -> ReplicatedData { + let file = File::open(path).expect("file"); + serde_json::from_reader(file).expect("parse") +} diff --git a/src/bin/multinode-demo.rs b/src/bin/multinode-demo.rs deleted file mode 100644 index 78c62f8ea..000000000 --- a/src/bin/multinode-demo.rs +++ /dev/null @@ -1,264 +0,0 @@ -extern crate futures; -extern crate getopts; -extern crate isatty; -extern crate rayon; -extern crate serde_json; -extern crate solana; - -use futures::Future; -use getopts::Options; -use isatty::stdin_isatty; -use rayon::prelude::*; -use solana::crdt::{Crdt, ReplicatedData}; -use solana::mint::MintDemo; -use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; -use solana::streamer::default_window; -use solana::thin_client::ThinClient; -use solana::transaction::Transaction; -use std::env; -use std::fs::File; -use std::io::{stdin, Read}; -use std::net::{SocketAddr, UdpSocket}; -use std::process::exit; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread::JoinHandle; -use std::thread::sleep; -use std::time::Duration; -use std::time::Instant; - -fn print_usage(program: &str, opts: Options) { - let mut brief = format!("Usage: cat | {} [options]\n\n", program); - brief += " Solana client demo creates a number of transactions and\n"; - brief += " sends them to a target node."; - brief += " Takes json formatted mint file to stdin."; - - print!("{}", opts.usage(&brief)); -} - -fn main() { - let mut threads = 4usize; - let mut num_nodes = 10usize; - let mut leader = "leader.json".to_string(); - - let mut opts = Options::new(); - opts.optopt("l", "", "leader", "leader.json"); - opts.optopt("c", "", "client address", "host:port"); - opts.optopt("t", "", "number of threads", &format!("{}", threads)); - opts.optopt( - "n", - "", - "number of nodes to converge to", - &format!("{}", num_nodes), - ); - opts.optflag("h", "help", "print help"); - let args: Vec = env::args().collect(); - let matches = match opts.parse(&args[1..]) { - Ok(m) => m, - Err(e) => { - eprintln!("{}", e); - exit(1); - } - }; - - if matches.opt_present("h") { - let program = args[0].clone(); - print_usage(&program, opts); - return; - } - if matches.opt_present("l") { - leader = matches.opt_str("l").unwrap(); - } - let client_addr: Arc> = if matches.opt_present("c") { - let addr = matches.opt_str("c").unwrap().parse().unwrap(); - Arc::new(RwLock::new(addr)) - } else { - Arc::new(RwLock::new("127.0.0.1:8010".parse().unwrap())) - }; - if matches.opt_present("t") { - threads = matches.opt_str("t").unwrap().parse().expect("integer"); - } - if matches.opt_present("n") { - num_nodes = matches.opt_str("n").unwrap().parse().expect("integer"); - } - - let leader: ReplicatedData = read_leader(leader); - let signal = Arc::new(AtomicBool::new(false)); - let mut c_threads = vec![]; - let validators = converge( - &client_addr, - &leader, - signal.clone(), - num_nodes + 2, - &mut c_threads, - ); - - if stdin_isatty() { - eprintln!("nothing found on stdin, expected a json file"); - exit(1); - } - - let mut buffer = String::new(); - let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); - if num_bytes == 0 { - eprintln!("empty file on stdin, expected a json file"); - exit(1); - } - - println!("Parsing stdin..."); - let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| { - eprintln!("failed to parse json: {}", e); - exit(1); - }); - let mut client = mk_client(&client_addr, &leader); - - println!("Get last ID..."); - let last_id = client.get_last_id().wait().unwrap(); - println!("Got last ID {:?}", last_id); - - let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes()); - - println!("Creating keypairs..."); - let txs = demo.num_accounts / 2; - let keypairs = rnd.gen_n_keypairs(demo.num_accounts); - let keypair_pairs: Vec<_> = keypairs.chunks(2).collect(); - - println!("Signing transactions..."); - let now = Instant::now(); - let transactions: Vec<_> = keypair_pairs - .into_par_iter() - .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) - .collect(); - let duration = now.elapsed(); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let bsps = txs as f64 / ns as f64; - let nsps = ns as f64 / txs as f64; - println!( - "Done. {} thousand signatures per second, {}us per signature", - bsps * 1_000_000_f64, - nsps / 1_000_f64 - ); - - let first_count = client.transaction_count(); - println!("initial count {}", first_count); - - println!("Transfering {} transactions in {} batches", txs, threads); - let sz = transactions.len() / threads; - let chunks: Vec<_> = transactions.chunks(sz).collect(); - chunks.into_par_iter().for_each(|txs| { - println!("Transferring 1 unit {} times... to", txs.len()); - let client = mk_client(&client_addr, &leader); - for tx in txs { - client.transfer_signed(tx.clone()).unwrap(); - } - }); - - println!("Sampling tps every second...",); - validators.into_par_iter().for_each(|val| { - let mut client = mk_client(&client_addr, &val); - let mut now = Instant::now(); - let mut initial_tx_count = client.transaction_count(); - for i in 0..100 { - let tx_count = client.transaction_count(); - let duration = now.elapsed(); - now = Instant::now(); - let sample = tx_count - initial_tx_count; - initial_tx_count = tx_count; - println!( - "{}: Transactions processed {}", - val.transactions_addr, sample - ); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let tps = (sample * 1_000_000_000) as f64 / ns as f64; - println!("{}: {} tps", val.transactions_addr, tps); - let total = tx_count - first_count; - println!( - "{}: Total Transactions processed {}", - val.transactions_addr, total - ); - if total == transactions.len() as u64 { - break; - } - if i > 20 && sample == 0 { - break; - } - sleep(Duration::new(1, 0)); - } - }); - signal.store(true, Ordering::Relaxed); - for t in c_threads { - t.join().unwrap(); - } -} - -fn mk_client(locked_addr: &Arc>, r: &ReplicatedData) -> ThinClient { - let mut addr = locked_addr.write().unwrap(); - let port = addr.port(); - let transactions_socket = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 1); - let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 2); - ThinClient::new( - r.requests_addr, - requests_socket, - r.transactions_addr, - transactions_socket, - ) -} - -fn spy_node(client_addr: &Arc>) -> (ReplicatedData, UdpSocket) { - let mut addr = client_addr.write().unwrap(); - let port = addr.port(); - let gossip = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 1); - let daddr = "0.0.0.0:0".parse().unwrap(); - let pubkey = KeyPair::new().pubkey(); - let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr); - (node, gossip) -} - -fn converge( - client_addr: &Arc>, - leader: &ReplicatedData, - exit: Arc, - num_nodes: usize, - threads: &mut Vec>, -) -> Vec { - //lets spy on the network - let daddr = "0.0.0.0:0".parse().unwrap(); - let (spy, spy_gossip) = spy_node(client_addr); - let mut spy_crdt = Crdt::new(spy); - spy_crdt.insert(&leader); - spy_crdt.set_leader(leader.id); - - let spy_ref = Arc::new(RwLock::new(spy_crdt)); - let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); - let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); - //wait for the network to converge - for _ in 0..30 { - let min = spy_ref.read().unwrap().convergence(); - if num_nodes as u64 == min { - println!("converged!"); - break; - } - sleep(Duration::new(1, 0)); - } - threads.push(t_spy_listen); - threads.push(t_spy_gossip); - let v: Vec = spy_ref - .read() - .unwrap() - .table - .values() - .into_iter() - .filter(|x| x.requests_addr != daddr) - .map(|x| x.clone()) - .collect(); - v.clone() -} - -fn read_leader(path: String) -> ReplicatedData { - let file = File::open(path).expect("file"); - serde_json::from_reader(file).expect("parse") -}