From 504b318ef1a6f20e45562ffcf97ee64a3d725627 Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Wed, 23 May 2018 13:03:19 -0700 Subject: [PATCH] Hooks for binaries to run as leader or replicator and attach to network (#221) --- Cargo.toml | 5 + multinode-demo/client.sh | 7 + multinode-demo/leader.sh | 5 + multinode-demo/validator.sh | 7 + src/bin/multinode-demo.rs | 260 ++++++++++++++++++++++++++++++++++++ src/bin/testnode.rs | 173 +++++++++++++++++------- src/crdt.rs | 6 +- src/thin_client.rs | 21 +-- 8 files changed, 425 insertions(+), 59 deletions(-) create mode 100755 multinode-demo/client.sh create mode 100755 multinode-demo/leader.sh create mode 100755 multinode-demo/validator.sh create mode 100644 src/bin/multinode-demo.rs diff --git a/Cargo.toml b/Cargo.toml index f6518694e..f9580c0ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,10 @@ license = "Apache-2.0" name = "solana-client-demo" path = "src/bin/client-demo.rs" +[[bin]] +name = "solana-multinode-demo" +path = "src/bin/multinode-demo.rs" + [[bin]] name = "solana-testnode" path = "src/bin/testnode.rs" @@ -65,3 +69,4 @@ getopts = "^0.2" isatty = "0.1" futures = "0.1" rand = "0.4.2" +pnet = "^0.21.0" diff --git a/multinode-demo/client.sh b/multinode-demo/client.sh new file mode 100755 index 000000000..e430316ba --- /dev/null +++ b/multinode-demo/client.sh @@ -0,0 +1,7 @@ +#!/bin/bash +cd /home/ubuntu/solana +#git pull +export RUST_LOG=solana::crdt=trace +# scp ubuntu@18.206.1.146:~/solana/leader.json . +# scp ubuntu@18.206.1.146:~/solana/mint-demo.json . +cat mint-demo.json | cargo run --release --bin solana-multinode-demo -- -l leader.json -c 10.0.5.51:8100 -n 1 diff --git a/multinode-demo/leader.sh b/multinode-demo/leader.sh new file mode 100755 index 000000000..05c60c6bb --- /dev/null +++ b/multinode-demo/leader.sh @@ -0,0 +1,5 @@ +#!/bin/bash +cd /home/ubuntu/solana +git pull +export RUST_LOG=solana::crdt=trace +cat genesis.log | cargo run --release --bin solana-testnode -- -s leader.json -b 8000 -d diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh new file mode 100755 index 000000000..2d05f5e83 --- /dev/null +++ b/multinode-demo/validator.sh @@ -0,0 +1,7 @@ +#!/bin/bash +cd /home/ubuntu/solana +git pull +scp ubuntu@18.206.1.146:~/solana/leader.json . +scp ubuntu@18.206.1.146:~/solana/genesis.log . +export RUST_LOG=solana::crdt=trace +cat genesis.log | cargo run --release --bin solana-testnode -- -s replicator.json -r leader.json -b 9000 -d diff --git a/src/bin/multinode-demo.rs b/src/bin/multinode-demo.rs new file mode 100644 index 000000000..52caf0dab --- /dev/null +++ b/src/bin/multinode-demo.rs @@ -0,0 +1,260 @@ +extern crate futures; +extern crate getopts; +extern crate isatty; +extern crate rayon; +extern crate serde_json; +extern crate solana; +extern crate untrusted; + +use futures::Future; +use getopts::Options; +use isatty::stdin_isatty; +use rayon::prelude::*; +use solana::crdt::{Crdt, ReplicatedData}; +use solana::mint::MintDemo; +use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; +use solana::streamer::default_window; +use solana::thin_client::ThinClient; +use solana::transaction::Transaction; +use std::env; +use std::fs::File; +use std::io::{stdin, Read}; +use std::net::{SocketAddr, UdpSocket}; +use std::process::exit; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::JoinHandle; +use std::thread::sleep; +use std::time::Duration; +use std::time::Instant; +use untrusted::Input; + +fn print_usage(program: &str, opts: Options) { + let mut brief = format!("Usage: cat | {} [options]\n\n", program); + brief += " Solana client demo creates a number of transactions and\n"; + brief += " sends them to a target node."; + brief += " Takes json formatted mint file to stdin."; + + print!("{}", opts.usage(&brief)); +} + +fn main() { + let mut threads = 4usize; + let mut num_nodes = 10usize; + let mut leader = "leader.json".to_string(); + let mut client_addr: SocketAddr = "127.0.0.1:8010".parse().unwrap(); + + let mut opts = Options::new(); + opts.optopt("l", "", "leader", "leader.json"); + opts.optopt("c", "", "client address", "host"); + opts.optopt("t", "", "number of threads", &format!("{}", threads)); + opts.optopt( + "n", + "", + "number of nodes to converge to", + &format!("{}", num_nodes), + ); + opts.optflag("h", "help", "print help"); + let args: Vec = env::args().collect(); + let matches = match opts.parse(&args[1..]) { + Ok(m) => m, + Err(e) => { + eprintln!("{}", e); + exit(1); + } + }; + + if matches.opt_present("h") { + let program = args[0].clone(); + print_usage(&program, opts); + return; + } + if matches.opt_present("l") { + leader = matches.opt_str("l").unwrap(); + } + if matches.opt_present("c") { + client_addr = matches.opt_str("c").unwrap().parse().unwrap(); + } + if matches.opt_present("t") { + threads = matches.opt_str("t").unwrap().parse().expect("integer"); + } + if matches.opt_present("n") { + num_nodes = matches.opt_str("n").unwrap().parse().expect("integer"); + } + + let leader: ReplicatedData = read_leader(leader); + let signal = Arc::new(AtomicBool::new(false)); + let mut c_threads = vec![]; + let validators = converge( + &client_addr, + &leader, + signal.clone(), + num_nodes + 2, + &mut c_threads, + ); + + if stdin_isatty() { + eprintln!("nothing found on stdin, expected a json file"); + exit(1); + } + + let mut buffer = String::new(); + let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); + if num_bytes == 0 { + eprintln!("empty file on stdin, expected a json file"); + exit(1); + } + + println!("Parsing stdin..."); + let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| { + eprintln!("failed to parse json: {}", e); + exit(1); + }); + let mut client = mk_client(&client_addr, &leader); + + println!("Get last ID..."); + let last_id = client.get_last_id().wait().unwrap(); + println!("Got last ID {:?}", last_id); + + let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes()); + let tokens_per_user = 1_000; + + let users = rnd.gen_n_keys(demo.num_accounts, tokens_per_user); + + println!("Creating keypairs..."); + let txs = demo.num_accounts / 2; + let keypairs: Vec<_> = users + .into_par_iter() + .map(|(pkcs8, _)| KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap()) + .collect(); + let keypair_pairs: Vec<_> = keypairs.chunks(2).collect(); + + println!("Signing transactions..."); + let now = Instant::now(); + let transactions: Vec<_> = keypair_pairs + .into_par_iter() + .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) + .collect(); + let mut duration = now.elapsed(); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let bsps = txs as f64 / ns as f64; + let nsps = ns as f64 / txs as f64; + println!( + "Done. {} thousand signatures per second, {}us per signature", + bsps * 1_000_000_f64, + nsps / 1_000_f64 + ); + + let initial_tx_count = client.transaction_count(); + println!("initial count {}", initial_tx_count); + + println!("Transfering {} transactions in {} batches", txs, threads); + let now = Instant::now(); + let sz = transactions.len() / threads; + let chunks: Vec<_> = transactions.chunks(sz).collect(); + chunks.into_par_iter().for_each(|trs| { + println!("Transferring 1 unit {} times... to", trs.len()); + let client = mk_client(&client_addr, &leader); + for tr in trs { + client.transfer_signed(tr.clone()).unwrap(); + } + }); + + println!("Waiting for transactions to complete...",); + for _ in 0..10 { + let mut tx_count = client.transaction_count(); + duration = now.elapsed(); + let txs = tx_count - initial_tx_count; + println!("Transactions processed {}", txs); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let tps = (txs * 1_000_000_000) as f64 / ns as f64; + println!("{} tps", tps); + sleep(Duration::new(1, 0)); + } + for val in validators { + let mut client = mk_client(&client_addr, &val); + let mut tx_count = client.transaction_count(); + duration = now.elapsed(); + let txs = tx_count - initial_tx_count; + println!("Transactions processed {} on {}", txs, val.events_addr); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let tps = (txs * 1_000_000_000) as f64 / ns as f64; + println!("{} tps on {}", tps, val.events_addr); + } + signal.store(true, Ordering::Relaxed); + for t in c_threads { + t.join().unwrap(); + } +} + +fn mk_client(client_addr: &SocketAddr, r: &ReplicatedData) -> ThinClient { + let mut c = client_addr.clone(); + c.set_port(0); + let events_socket = UdpSocket::bind(c).unwrap(); + let mut addr = events_socket.local_addr().unwrap(); + let port = addr.port(); + addr.set_port(port + 1); + let requests_socket = UdpSocket::bind(addr).unwrap(); + ThinClient::new( + r.requests_addr, + requests_socket, + r.events_addr, + events_socket, + ) +} + +fn spy_node(client_addr: &SocketAddr) -> (ReplicatedData, UdpSocket) { + let mut addr = client_addr.clone(); + addr.set_port(0); + let gossip = UdpSocket::bind(addr).unwrap(); + let daddr = "0.0.0.0:0".parse().unwrap(); + let pubkey = KeyPair::new().pubkey(); + let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr); + (node, gossip) +} + +fn converge( + client_addr: &SocketAddr, + leader: &ReplicatedData, + exit: Arc, + num_nodes: usize, + threads: &mut Vec>, +) -> Vec { + //lets spy on the network + let (spy, spy_gossip) = spy_node(client_addr); + let me = spy.id.clone(); + let mut spy_crdt = Crdt::new(spy); + spy_crdt.insert(&leader); + spy_crdt.set_leader(leader.id); + + let spy_ref = Arc::new(RwLock::new(spy_crdt)); + let spy_window = default_window(); + let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); + let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); + //wait for the network to converge + for _ in 0..30 { + let min = spy_ref.read().unwrap().convergence(); + if num_nodes as u64 == min { + println!("converged!"); + break; + } + sleep(Duration::new(1, 0)); + } + threads.push(t_spy_listen); + threads.push(t_spy_gossip); + let v: Vec = spy_ref + .read() + .unwrap() + .table + .values() + .into_iter() + .filter(|x| x.id != me) + .map(|x| x.clone()) + .collect(); + v.clone() +} + +fn read_leader(path: String) -> ReplicatedData { + let file = File::open(path).expect("file"); + serde_json::from_reader(file).expect("parse") +} diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 1663e1e60..179507f4b 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -1,11 +1,13 @@ extern crate env_logger; extern crate getopts; extern crate isatty; +extern crate pnet; extern crate serde_json; extern crate solana; use getopts::Options; use isatty::stdin_isatty; +use pnet::datalink; use solana::bank::Bank; use solana::crdt::ReplicatedData; use solana::entry::Entry; @@ -13,8 +15,9 @@ use solana::event::Event; use solana::server::Server; use solana::signature::{KeyPair, KeyPairUtil}; use std::env; +use std::fs::File; use std::io::{stdin, stdout, Read}; -use std::net::UdpSocket; +use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::process::exit; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -31,10 +34,17 @@ fn print_usage(program: &str, opts: Options) { fn main() { env_logger::init().unwrap(); - let mut port = 8000u16; let mut opts = Options::new(); - opts.optopt("p", "", "port", "port"); + opts.optopt("b", "", "bind", "bind to port or address"); + opts.optflag("d", "dyn", "detect network address dynamically"); + opts.optopt("s", "", "save", "save my identity to path.json"); opts.optflag("h", "help", "print help"); + opts.optopt( + "v", + "", + "validator", + "run as replicate with path to leader.json", + ); let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { Ok(m) => m, @@ -48,15 +58,14 @@ fn main() { print_usage(&program, opts); return; } - if matches.opt_present("p") { - port = matches.opt_str("p").unwrap().parse().expect("port"); - } - let serve_addr = format!("0.0.0.0:{}", port); - let gossip_addr = format!("0.0.0.0:{}", port + 1); - let replicate_addr = format!("0.0.0.0:{}", port + 2); - let events_addr = format!("0.0.0.0:{}", port + 3); - eprintln!("events_addr: {:?}", events_addr); - + let bind_addr: SocketAddr = { + let mut bind_addr = parse_port_or_addr(matches.opt_str("b")); + if matches.opt_present("d") { + let ip = get_ip_addr().unwrap(); + bind_addr.set_ip(ip); + } + bind_addr + }; if stdin_isatty() { eprintln!("nothing found on stdin, expected a log file"); exit(1); @@ -117,44 +126,110 @@ fn main() { eprintln!("creating networking stack..."); let exit = Arc::new(AtomicBool::new(false)); - let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); - serve_sock - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); + // we need all the receiving sockets to be bound within the expected + // port range that we open on aws + let mut repl_data = make_repl_data(&bind_addr); + let threads = if matches.opt_present("r") { + eprintln!("starting validator... {}", repl_data.requests_addr); + let path = matches.opt_str("r").unwrap(); + let file = File::open(path).expect("file"); + let leader = serde_json::from_reader(file).expect("parse"); + let s = Server::new_validator( + bank, + repl_data.clone(), + UdpSocket::bind(repl_data.requests_addr).unwrap(), + UdpSocket::bind("0.0.0.0:0").unwrap(), + UdpSocket::bind(repl_data.replicate_addr).unwrap(), + UdpSocket::bind(repl_data.gossip_addr).unwrap(), + leader, + exit.clone(), + ); + s.thread_hdls + } else { + eprintln!("starting leader... {}", repl_data.requests_addr); + repl_data.current_leader_id = repl_data.id.clone(); + let server = Server::new_leader( + bank, + last_id, + Some(Duration::from_millis(1000)), + repl_data.clone(), + UdpSocket::bind(repl_data.requests_addr).unwrap(), + UdpSocket::bind(repl_data.events_addr).unwrap(), + UdpSocket::bind("0.0.0.0:0").unwrap(), + UdpSocket::bind("0.0.0.0:0").unwrap(), + UdpSocket::bind(repl_data.gossip_addr).unwrap(), + exit.clone(), + stdout(), + ); + server.thread_hdls + }; + if matches.opt_present("s") { + let path = matches.opt_str("s").unwrap(); + let file = File::create(path).expect("file"); + serde_json::to_writer(file, &repl_data).expect("serialize"); + } + eprintln!("Ready. Listening on {}", bind_addr); - let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); - let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); - let events_sock = UdpSocket::bind(&events_addr).unwrap(); - let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new( - pubkey, - gossip_sock.local_addr().unwrap(), - replicate_sock.local_addr().unwrap(), - serve_sock.local_addr().unwrap(), - events_sock.local_addr().unwrap(), - ); - - let mut local = serve_sock.local_addr().unwrap(); - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local).unwrap(); - let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - - eprintln!("starting server..."); - let server = Server::new_leader( - bank, - last_id, - Some(Duration::from_millis(1000)), - d, - serve_sock, - events_sock, - broadcast_socket, - respond_socket, - gossip_sock, - exit.clone(), - stdout(), - ); - eprintln!("Ready. Listening on {}", serve_addr); - for t in server.thread_hdls { + for t in threads { t.join().expect("join"); } } + +fn next_port(server_addr: &SocketAddr, nxt: u16) -> SocketAddr { + let mut gossip_addr = server_addr.clone(); + gossip_addr.set_port(server_addr.port() + nxt); + gossip_addr +} + +fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData { + let events_addr = bind_addr.clone(); + let gossip_addr = next_port(&bind_addr, 1); + let replicate_addr = next_port(&bind_addr, 2); + let requests_addr = next_port(&bind_addr, 3); + let pubkey = KeyPair::new().pubkey(); + ReplicatedData::new( + pubkey, + gossip_addr, + replicate_addr, + requests_addr, + events_addr, + ) +} + +fn parse_port_or_addr(optstr: Option) -> SocketAddr { + let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); + if let Some(addrstr) = optstr { + if let Ok(port) = addrstr.parse() { + let mut addr = daddr.clone(); + addr.set_port(port); + addr + } else if let Ok(addr) = addrstr.parse() { + addr + } else { + daddr + } + } else { + daddr + } +} + +fn get_ip_addr() -> Option { + for iface in datalink::interfaces() { + for p in iface.ips { + if !p.ip().is_loopback() && !p.ip().is_multicast() { + return Some(p.ip()); + } + } + } + None +} + +#[test] +fn test_parse_port_or_addr() { + let p1 = parse_port_or_addr(Some("9000".to_string())); + assert_eq!(p1.port(), 9000); + let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string())); + assert_eq!(p2.port(), 7000); + let p3 = parse_port_or_addr(None); + assert_eq!(p3.port(), 8000); +} diff --git a/src/crdt.rs b/src/crdt.rs index 1393fb5a5..41bd8c62d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -46,7 +46,7 @@ pub struct ReplicatedData { /// events address pub events_addr: SocketAddr, /// current leader identity - current_leader_id: PublicKey, + pub current_leader_id: PublicKey, /// last verified hash that was submitted to the leader last_verified_hash: Hash, /// last verified count, always increasing @@ -197,6 +197,7 @@ impl Crdt { }) .collect(); if nodes.len() < 1 { + warn!("crdt too small"); return Err(Error::CrdtTooSmall); } info!("nodes table {}", nodes.len()); @@ -347,6 +348,7 @@ impl Crdt { fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> { let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect(); if options.len() < 1 { + trace!("crdt too small for gossip"); return Err(Error::CrdtTooSmall); } let n = (Self::random() as usize) % options.len(); @@ -370,6 +372,7 @@ impl Crdt { // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have let r = serialize(&req)?; + trace!("sending gossip request to {}", remote_gossip_addr); sock.send_to(&r, remote_gossip_addr)?; Ok(()) } @@ -440,6 +443,7 @@ impl Crdt { ) -> Result<()> { //TODO cache connections let mut buf = vec![0u8; 1024 * 64]; + trace!("recv_from on {}", sock.local_addr().unwrap()); let (amt, src) = sock.recv_from(&mut buf)?; trace!("got request from {}", src); buf.resize(amt, 0); diff --git a/src/thin_client.rs b/src/thin_client.rs index 249119e0a..57010bf9f 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -321,7 +321,7 @@ mod tests { exit: Arc, num_nodes: usize, threads: &mut Vec>, - ) -> Vec { + ) -> Vec { //lets spy on the network let mut spy = TestNode::new(); let daddr = "0.0.0.0:0".parse().unwrap(); @@ -354,16 +354,16 @@ mod tests { assert!(converged); threads.push(t_spy_listen); threads.push(t_spy_gossip); - let v: Vec = spy_ref + let ret: Vec<_> = spy_ref .read() .unwrap() .table .values() .into_iter() .filter(|x| x.id != me) - .map(|x| x.requests_addr) + .map(|x| x.clone()) .collect(); - v.clone() + ret.clone() } #[test] #[ignore] @@ -377,7 +377,6 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let leader_bank = Bank::new(&alice); - let events_addr = leader.data.events_addr; let server = Server::new_leader( leader_bank, alice.last_id(), @@ -425,17 +424,21 @@ mod tests { assert_eq!(leader_balance, 500); //verify replicant has the same balance let mut success = 0usize; - for serve_addr in addrs.iter() { + for rd in addrs.iter() { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = - ThinClient::new(*serve_addr, requests_socket, events_addr, events_socket); + let mut client = ThinClient::new( + rd.requests_addr, + requests_socket, + rd.events_addr, + events_socket, + ); for i in 0..10 { - trace!("getting replicant balance {} {}/10", *serve_addr, i); + trace!("getting replicant balance {} {}/10", rd.requests_addr, i); if let Ok(bal) = client.get_balance(&bob_pubkey) { trace!("replicant balance {}", bal); if bal == leader_balance {