Hooks for binaries to run as leader or replicator and attach to network (#221)

This commit is contained in:
anatoly yakovenko 2018-05-23 13:03:19 -07:00 committed by Greg Fitzgerald
parent f154c8c490
commit 504b318ef1
8 changed files with 425 additions and 59 deletions

View File

@ -16,6 +16,10 @@ license = "Apache-2.0"
name = "solana-client-demo"
path = "src/bin/client-demo.rs"
[[bin]]
name = "solana-multinode-demo"
path = "src/bin/multinode-demo.rs"
[[bin]]
name = "solana-testnode"
path = "src/bin/testnode.rs"
@ -65,3 +69,4 @@ getopts = "^0.2"
isatty = "0.1"
futures = "0.1"
rand = "0.4.2"
pnet = "^0.21.0"

7
multinode-demo/client.sh Executable file
View File

@ -0,0 +1,7 @@
#!/bin/bash
cd /home/ubuntu/solana
#git pull
export RUST_LOG=solana::crdt=trace
# scp ubuntu@18.206.1.146:~/solana/leader.json .
# scp ubuntu@18.206.1.146:~/solana/mint-demo.json .
cat mint-demo.json | cargo run --release --bin solana-multinode-demo -- -l leader.json -c 10.0.5.51:8100 -n 1

5
multinode-demo/leader.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/bash
cd /home/ubuntu/solana
git pull
export RUST_LOG=solana::crdt=trace
cat genesis.log | cargo run --release --bin solana-testnode -- -s leader.json -b 8000 -d

7
multinode-demo/validator.sh Executable file
View File

@ -0,0 +1,7 @@
#!/bin/bash
cd /home/ubuntu/solana
git pull
scp ubuntu@18.206.1.146:~/solana/leader.json .
scp ubuntu@18.206.1.146:~/solana/genesis.log .
export RUST_LOG=solana::crdt=trace
cat genesis.log | cargo run --release --bin solana-testnode -- -s replicator.json -r leader.json -b 9000 -d

260
src/bin/multinode-demo.rs Normal file
View File

@ -0,0 +1,260 @@
extern crate futures;
extern crate getopts;
extern crate isatty;
extern crate rayon;
extern crate serde_json;
extern crate solana;
extern crate untrusted;
use futures::Future;
use getopts::Options;
use isatty::stdin_isatty;
use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData};
use solana::mint::MintDemo;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
use solana::transaction::Transaction;
use std::env;
use std::fs::File;
use std::io::{stdin, Read};
use std::net::{SocketAddr, UdpSocket};
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;
use untrusted::Input;
fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <mint.json> | {} [options]\n\n", program);
brief += " Solana client demo creates a number of transactions and\n";
brief += " sends them to a target node.";
brief += " Takes json formatted mint file to stdin.";
print!("{}", opts.usage(&brief));
}
fn main() {
let mut threads = 4usize;
let mut num_nodes = 10usize;
let mut leader = "leader.json".to_string();
let mut client_addr: SocketAddr = "127.0.0.1:8010".parse().unwrap();
let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("c", "", "client address", "host");
opts.optopt("t", "", "number of threads", &format!("{}", threads));
opts.optopt(
"n",
"",
"number of nodes to converge to",
&format!("{}", num_nodes),
);
opts.optflag("h", "help", "print help");
let args: Vec<String> = env::args().collect();
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(e) => {
eprintln!("{}", e);
exit(1);
}
};
if matches.opt_present("h") {
let program = args[0].clone();
print_usage(&program, opts);
return;
}
if matches.opt_present("l") {
leader = matches.opt_str("l").unwrap();
}
if matches.opt_present("c") {
client_addr = matches.opt_str("c").unwrap().parse().unwrap();
}
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}
if matches.opt_present("n") {
num_nodes = matches.opt_str("n").unwrap().parse().expect("integer");
}
let leader: ReplicatedData = read_leader(leader);
let signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let validators = converge(
&client_addr,
&leader,
signal.clone(),
num_nodes + 2,
&mut c_threads,
);
if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file");
exit(1);
}
let mut buffer = String::new();
let num_bytes = stdin().read_to_string(&mut buffer).unwrap();
if num_bytes == 0 {
eprintln!("empty file on stdin, expected a json file");
exit(1);
}
println!("Parsing stdin...");
let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| {
eprintln!("failed to parse json: {}", e);
exit(1);
});
let mut client = mk_client(&client_addr, &leader);
println!("Get last ID...");
let last_id = client.get_last_id().wait().unwrap();
println!("Got last ID {:?}", last_id);
let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes());
let tokens_per_user = 1_000;
let users = rnd.gen_n_keys(demo.num_accounts, tokens_per_user);
println!("Creating keypairs...");
let txs = demo.num_accounts / 2;
let keypairs: Vec<_> = users
.into_par_iter()
.map(|(pkcs8, _)| KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap())
.collect();
let keypair_pairs: Vec<_> = keypairs.chunks(2).collect();
println!("Signing transactions...");
let now = Instant::now();
let transactions: Vec<_> = keypair_pairs
.into_par_iter()
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
.collect();
let mut duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = txs as f64 / ns as f64;
let nsps = ns as f64 / txs as f64;
println!(
"Done. {} thousand signatures per second, {}us per signature",
bsps * 1_000_000_f64,
nsps / 1_000_f64
);
let initial_tx_count = client.transaction_count();
println!("initial count {}", initial_tx_count);
println!("Transfering {} transactions in {} batches", txs, threads);
let now = Instant::now();
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|trs| {
println!("Transferring 1 unit {} times... to", trs.len());
let client = mk_client(&client_addr, &leader);
for tr in trs {
client.transfer_signed(tr.clone()).unwrap();
}
});
println!("Waiting for transactions to complete...",);
for _ in 0..10 {
let mut tx_count = client.transaction_count();
duration = now.elapsed();
let txs = tx_count - initial_tx_count;
println!("Transactions processed {}", txs);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
println!("{} tps", tps);
sleep(Duration::new(1, 0));
}
for val in validators {
let mut client = mk_client(&client_addr, &val);
let mut tx_count = client.transaction_count();
duration = now.elapsed();
let txs = tx_count - initial_tx_count;
println!("Transactions processed {} on {}", txs, val.events_addr);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
println!("{} tps on {}", tps, val.events_addr);
}
signal.store(true, Ordering::Relaxed);
for t in c_threads {
t.join().unwrap();
}
}
fn mk_client(client_addr: &SocketAddr, r: &ReplicatedData) -> ThinClient {
let mut c = client_addr.clone();
c.set_port(0);
let events_socket = UdpSocket::bind(c).unwrap();
let mut addr = events_socket.local_addr().unwrap();
let port = addr.port();
addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr).unwrap();
ThinClient::new(
r.requests_addr,
requests_socket,
r.events_addr,
events_socket,
)
}
fn spy_node(client_addr: &SocketAddr) -> (ReplicatedData, UdpSocket) {
let mut addr = client_addr.clone();
addr.set_port(0);
let gossip = UdpSocket::bind(addr).unwrap();
let daddr = "0.0.0.0:0".parse().unwrap();
let pubkey = KeyPair::new().pubkey();
let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr);
(node, gossip)
}
fn converge(
client_addr: &SocketAddr,
leader: &ReplicatedData,
exit: Arc<AtomicBool>,
num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>,
) -> Vec<ReplicatedData> {
//lets spy on the network
let (spy, spy_gossip) = spy_node(client_addr);
let me = spy.id.clone();
let mut spy_crdt = Crdt::new(spy);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge
for _ in 0..30 {
let min = spy_ref.read().unwrap().convergence();
if num_nodes as u64 == min {
println!("converged!");
break;
}
sleep(Duration::new(1, 0));
}
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()
.table
.values()
.into_iter()
.filter(|x| x.id != me)
.map(|x| x.clone())
.collect();
v.clone()
}
fn read_leader(path: String) -> ReplicatedData {
let file = File::open(path).expect("file");
serde_json::from_reader(file).expect("parse")
}

View File

@ -1,11 +1,13 @@
extern crate env_logger;
extern crate getopts;
extern crate isatty;
extern crate pnet;
extern crate serde_json;
extern crate solana;
use getopts::Options;
use isatty::stdin_isatty;
use pnet::datalink;
use solana::bank::Bank;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
@ -13,8 +15,9 @@ use solana::event::Event;
use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil};
use std::env;
use std::fs::File;
use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::process::exit;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
@ -31,10 +34,17 @@ fn print_usage(program: &str, opts: Options) {
fn main() {
env_logger::init().unwrap();
let mut port = 8000u16;
let mut opts = Options::new();
opts.optopt("p", "", "port", "port");
opts.optopt("b", "", "bind", "bind to port or address");
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optopt("s", "", "save", "save my identity to path.json");
opts.optflag("h", "help", "print help");
opts.optopt(
"v",
"",
"validator",
"run as replicate with path to leader.json",
);
let args: Vec<String> = env::args().collect();
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
@ -48,15 +58,14 @@ fn main() {
print_usage(&program, opts);
return;
}
if matches.opt_present("p") {
port = matches.opt_str("p").unwrap().parse().expect("port");
}
let serve_addr = format!("0.0.0.0:{}", port);
let gossip_addr = format!("0.0.0.0:{}", port + 1);
let replicate_addr = format!("0.0.0.0:{}", port + 2);
let events_addr = format!("0.0.0.0:{}", port + 3);
eprintln!("events_addr: {:?}", events_addr);
let bind_addr: SocketAddr = {
let mut bind_addr = parse_port_or_addr(matches.opt_str("b"));
if matches.opt_present("d") {
let ip = get_ip_addr().unwrap();
bind_addr.set_ip(ip);
}
bind_addr
};
if stdin_isatty() {
eprintln!("nothing found on stdin, expected a log file");
exit(1);
@ -117,44 +126,110 @@ fn main() {
eprintln!("creating networking stack...");
let exit = Arc::new(AtomicBool::new(false));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
serve_sock
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
// we need all the receiving sockets to be bound within the expected
// port range that we open on aws
let mut repl_data = make_repl_data(&bind_addr);
let threads = if matches.opt_present("r") {
eprintln!("starting validator... {}", repl_data.requests_addr);
let path = matches.opt_str("r").unwrap();
let file = File::open(path).expect("file");
let leader = serde_json::from_reader(file).expect("parse");
let s = Server::new_validator(
bank,
repl_data.clone(),
UdpSocket::bind(repl_data.requests_addr).unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind(repl_data.replicate_addr).unwrap(),
UdpSocket::bind(repl_data.gossip_addr).unwrap(),
leader,
exit.clone(),
);
s.thread_hdls
} else {
eprintln!("starting leader... {}", repl_data.requests_addr);
repl_data.current_leader_id = repl_data.id.clone();
let server = Server::new_leader(
bank,
last_id,
Some(Duration::from_millis(1000)),
repl_data.clone(),
UdpSocket::bind(repl_data.requests_addr).unwrap(),
UdpSocket::bind(repl_data.events_addr).unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind(repl_data.gossip_addr).unwrap(),
exit.clone(),
stdout(),
);
server.thread_hdls
};
if matches.opt_present("s") {
let path = matches.opt_str("s").unwrap();
let file = File::create(path).expect("file");
serde_json::to_writer(file, &repl_data).expect("serialize");
}
eprintln!("Ready. Listening on {}", bind_addr);
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
let events_sock = UdpSocket::bind(&events_addr).unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip_sock.local_addr().unwrap(),
replicate_sock.local_addr().unwrap(),
serve_sock.local_addr().unwrap(),
events_sock.local_addr().unwrap(),
);
let mut local = serve_sock.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
eprintln!("starting server...");
let server = Server::new_leader(
bank,
last_id,
Some(Duration::from_millis(1000)),
d,
serve_sock,
events_sock,
broadcast_socket,
respond_socket,
gossip_sock,
exit.clone(),
stdout(),
);
eprintln!("Ready. Listening on {}", serve_addr);
for t in server.thread_hdls {
for t in threads {
t.join().expect("join");
}
}
fn next_port(server_addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut gossip_addr = server_addr.clone();
gossip_addr.set_port(server_addr.port() + nxt);
gossip_addr
}
fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData {
let events_addr = bind_addr.clone();
let gossip_addr = next_port(&bind_addr, 1);
let replicate_addr = next_port(&bind_addr, 2);
let requests_addr = next_port(&bind_addr, 3);
let pubkey = KeyPair::new().pubkey();
ReplicatedData::new(
pubkey,
gossip_addr,
replicate_addr,
requests_addr,
events_addr,
)
}
fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {
let mut addr = daddr.clone();
addr.set_port(port);
addr
} else if let Ok(addr) = addrstr.parse() {
addr
} else {
daddr
}
} else {
daddr
}
}
fn get_ip_addr() -> Option<IpAddr> {
for iface in datalink::interfaces() {
for p in iface.ips {
if !p.ip().is_loopback() && !p.ip().is_multicast() {
return Some(p.ip());
}
}
}
None
}
#[test]
fn test_parse_port_or_addr() {
let p1 = parse_port_or_addr(Some("9000".to_string()));
assert_eq!(p1.port(), 9000);
let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string()));
assert_eq!(p2.port(), 7000);
let p3 = parse_port_or_addr(None);
assert_eq!(p3.port(), 8000);
}

View File

@ -46,7 +46,7 @@ pub struct ReplicatedData {
/// events address
pub events_addr: SocketAddr,
/// current leader identity
current_leader_id: PublicKey,
pub current_leader_id: PublicKey,
/// last verified hash that was submitted to the leader
last_verified_hash: Hash,
/// last verified count, always increasing
@ -197,6 +197,7 @@ impl Crdt {
})
.collect();
if nodes.len() < 1 {
warn!("crdt too small");
return Err(Error::CrdtTooSmall);
}
info!("nodes table {}", nodes.len());
@ -347,6 +348,7 @@ impl Crdt {
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
if options.len() < 1 {
trace!("crdt too small for gossip");
return Err(Error::CrdtTooSmall);
}
let n = (Self::random() as usize) % options.len();
@ -370,6 +372,7 @@ impl Crdt {
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
let r = serialize(&req)?;
trace!("sending gossip request to {}", remote_gossip_addr);
sock.send_to(&r, remote_gossip_addr)?;
Ok(())
}
@ -440,6 +443,7 @@ impl Crdt {
) -> Result<()> {
//TODO cache connections
let mut buf = vec![0u8; 1024 * 64];
trace!("recv_from on {}", sock.local_addr().unwrap());
let (amt, src) = sock.recv_from(&mut buf)?;
trace!("got request from {}", src);
buf.resize(amt, 0);

View File

@ -321,7 +321,7 @@ mod tests {
exit: Arc<AtomicBool>,
num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>,
) -> Vec<SocketAddr> {
) -> Vec<ReplicatedData> {
//lets spy on the network
let mut spy = TestNode::new();
let daddr = "0.0.0.0:0".parse().unwrap();
@ -354,16 +354,16 @@ mod tests {
assert!(converged);
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
let v: Vec<SocketAddr> = spy_ref
let ret: Vec<_> = spy_ref
.read()
.unwrap()
.table
.values()
.into_iter()
.filter(|x| x.id != me)
.map(|x| x.requests_addr)
.map(|x| x.clone())
.collect();
v.clone()
ret.clone()
}
#[test]
#[ignore]
@ -377,7 +377,6 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let leader_bank = Bank::new(&alice);
let events_addr = leader.data.events_addr;
let server = Server::new_leader(
leader_bank,
alice.last_id(),
@ -425,17 +424,21 @@ mod tests {
assert_eq!(leader_balance, 500);
//verify replicant has the same balance
let mut success = 0usize;
for serve_addr in addrs.iter() {
for rd in addrs.iter() {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client =
ThinClient::new(*serve_addr, requests_socket, events_addr, events_socket);
let mut client = ThinClient::new(
rd.requests_addr,
requests_socket,
rd.events_addr,
events_socket,
);
for i in 0..10 {
trace!("getting replicant balance {} {}/10", *serve_addr, i);
trace!("getting replicant balance {} {}/10", rd.requests_addr, i);
if let Ok(bal) = client.get_balance(&bob_pubkey) {
trace!("replicant balance {}", bal);
if bal == leader_balance {