diff --git a/README.md b/README.md index d7d2b18c9..2ef447da6 100644 --- a/README.md +++ b/README.md @@ -47,46 +47,52 @@ used later in this demo. $ cat mint-demo.json | cargo run --release --bin solana-genesis-demo > genesis.log ``` -Now you can start the server: +Before you start the server, make sure you know the IP address of the machine ou want to be the leader for the demo, and make sure that udp ports 8000-10000 are open on all the machines you wan to test with. Now you can start the server: ```bash - $ cat genesis.log | cargo run --release --bin solana-fullnode > transactions0.log + $ cat ./multinode-demo/leader.sh + #!/bin/bash + export RUST_LOG=solana=info + sudo sysctl -w net.core.rmem_max=26214400 + cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log + $ ./multinode-demo/leader.sh ``` Wait a few seconds for the server to initialize. It will print "Ready." when it's safe to start sending it transactions. +Now you can start some validators: + +```bash + $ cat ./multinode-demo/validator.sh + #!/bin/bash + rsync -v -e ssh $1:~/solana/mint-demo.json . + rsync -v -e ssh $1:~/solana/leader.json . + rsync -v -e ssh $1:~/solana/genesis.log . + rsync -v -e ssh $1:~/solana/leader.log . + rsync -v -e ssh $1:~/solana/libcuda_verify_ed25519.a . + export RUST_LOG=solana=info + sudo sysctl -w net.core.rmem_max=26214400 + cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log + $ ./multinode-demo/validator.sh ubuntu@10.0.1.51 #The leader machine +``` + + Then, in a separate shell, let's execute some transactions. Note we pass in the JSON configuration file here, not the genesis ledger. +>>>>>>> logs ```bash - $ cat mint-demo.json | cargo run --release --bin solana-client-demo + $ cat ./multinode-demo/client.sh + #!/bin/bash + export RUST_LOG=solana=info + rsync -v -e ssh $1:~/solana/leader.json . + rsync -v -e ssh $1:~/solana/mint-demo.json . + cat mint-demo.json | cargo run --release --bin solana-full-node -- -l leader.json -c 8100 -n 1 + $ ./multinode-demo/client.sh ubuntu@10.0.1.51 #The leader machine ``` -Now kill the server with Ctrl-C, and take a look at the ledger. You should -see something similar to: - -```json -{"num_hashes":27,"id":[0, "..."],"event":"Tick"} -{"num_hashes":3,"id":[67, "..."],"event":{"Transaction":{"tokens":42}}} -{"num_hashes":27,"id":[0, "..."],"event":"Tick"} -``` - -Now restart the server from where we left off. Pass it both the genesis ledger, and -the transaction ledger. - -```bash - $ cat genesis.log transactions0.log | cargo run --release --bin solana-fullnode > transactions1.log -``` - -Lastly, run the client demo again, and verify that all funds were spent in the -previous round, and so no additional transactions are added. - -```bash - $ cat mint-demo.json | cargo run --release --bin solana-client-demo -``` - -Stop the server again, and verify there are only Tick entries, and no Transaction entries. +Try starting a more validators and reruning the client demo! Developing === diff --git a/multinode-demo/client.sh b/multinode-demo/client.sh index 96963ca11..0c9702fe7 100755 --- a/multinode-demo/client.sh +++ b/multinode-demo/client.sh @@ -1,7 +1,5 @@ #!/bin/bash -cd /home/ubuntu/solana -#git pull -export RUST_LOG=solana::crdt=trace -# scp ubuntu@18.206.1.146:~/solana/leader.json . -# scp ubuntu@18.206.1.146:~/solana/mint-demo.json . -cat mint-demo.json | cargo run --release --bin solana-multinode-demo -- -l leader.json -c 10.0.5.179:8100 -n 3 +export RUST_LOG=solana=info +rsync -v -e ssh $1:~/solana/leader.json . +rsync -v -e ssh $1:~/solana/mint-demo.json . +cat mint-demo.json | cargo run --release --bin solana-client-demo -- -l leader.json -c 8100 -n 1 diff --git a/multinode-demo/leader.sh b/multinode-demo/leader.sh index 88a129fed..eb8bf8ac2 100755 --- a/multinode-demo/leader.sh +++ b/multinode-demo/leader.sh @@ -1,6 +1,4 @@ #!/bin/bash -cd /home/ubuntu/solana -git pull export RUST_LOG=solana=info -cat genesis.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -b 8000 -d | grep INFO -#cat genesis.log | cargo run --release --bin solana-testnode -- -s leader.json -b 8000 -d +sudo sysctl -w net.core.rmem_max=26214400 +cat genesis.log leader.log | cargo run --release --features cuda --bin solana-testnode -- -s leader.json -l leader.json -b 8000 -d 2>&1 | tee leader-tee.log diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index df1cddde1..1e2ce97b0 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -1,10 +1,9 @@ #!/bin/bash -cd /home/ubuntu/solana -git pull -scp ubuntu@18.206.1.146:~/solana/mint-demo.json . -scp ubuntu@18.206.1.146:~/solana/leader.json . -scp ubuntu@18.206.1.146:~/solana/genesis.log . -scp ubuntu@18.206.1.146:~/solana/libcuda_verify_ed25519.a . +rsync -v -e ssh $1:~/solana/mint-demo.json . +rsync -v -e ssh $1:~/solana/leader.json . +rsync -v -e ssh $1:~/solana/genesis.log . +rsync -v -e ssh $1:~/solana/leader.log . +rsync -v -e ssh $1:~/solana/libcuda_verify_ed25519.a . export RUST_LOG=solana=info -cat genesis.log | cargo run --release --features cuda --bin solana-testnode -- -s replicator.json -v leader.json -b 9000 -d 2>&1 | tee validator.log - +sudo sysctl -w net.core.rmem_max=26214400 +cat genesis.log leader.log | cargo run --release --features cuda --bin solana-fullnode -- -l validator.json -s validator.json -v leader.json -b 9000 -d 2>&1 | tee validator-tee.log diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 78c62f8ea..589a158ce 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -1,6 +1,7 @@ extern crate futures; extern crate getopts; extern crate isatty; +extern crate pnet; extern crate rayon; extern crate serde_json; extern crate solana; @@ -8,6 +9,7 @@ extern crate solana; use futures::Future; use getopts::Options; use isatty::stdin_isatty; +use pnet::datalink; use rayon::prelude::*; use solana::crdt::{Crdt, ReplicatedData}; use solana::mint::MintDemo; @@ -18,7 +20,7 @@ use solana::transaction::Transaction; use std::env; use std::fs::File; use std::io::{stdin, Read}; -use std::net::{SocketAddr, UdpSocket}; +use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -36,6 +38,17 @@ fn print_usage(program: &str, opts: Options) { print!("{}", opts.usage(&brief)); } +fn get_ip_addr() -> Option { + for iface in datalink::interfaces() { + for p in iface.ips { + if !p.ip().is_loopback() && !p.ip().is_multicast() { + return Some(p.ip()); + } + } + } + None +} + fn main() { let mut threads = 4usize; let mut num_nodes = 10usize; @@ -43,7 +56,7 @@ fn main() { let mut opts = Options::new(); opts.optopt("l", "", "leader", "leader.json"); - opts.optopt("c", "", "client address", "host:port"); + opts.optopt("c", "", "client port", "port"); opts.optopt("t", "", "number of threads", &format!("{}", threads)); opts.optopt( "n", @@ -69,12 +82,13 @@ fn main() { if matches.opt_present("l") { leader = matches.opt_str("l").unwrap(); } - let client_addr: Arc> = if matches.opt_present("c") { - let addr = matches.opt_str("c").unwrap().parse().unwrap(); - Arc::new(RwLock::new(addr)) - } else { - Arc::new(RwLock::new("127.0.0.1:8010".parse().unwrap())) - }; + let mut addr: SocketAddr = "127.0.0.1:8010".parse().unwrap(); + if matches.opt_present("c") { + let port = matches.opt_str("c").unwrap().parse().unwrap(); + addr.set_port(port); + } + addr.set_ip(get_ip_addr().unwrap()); + let client_addr: Arc> = Arc::new(RwLock::new(addr)); if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } @@ -230,7 +244,6 @@ fn converge( let mut spy_crdt = Crdt::new(spy); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); - let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index b51857967..487b30086 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -16,12 +16,12 @@ use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Instruction; use std::env; use std::fs::File; -use std::io::{stdin, stdout, Read}; +use std::io::{stdin, Read}; use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::process::exit; use std::sync::Arc; use std::sync::atomic::AtomicBool; -use std::time::Duration; +//use std::time::Duration; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); @@ -38,6 +38,7 @@ fn main() { opts.optopt("b", "", "bind", "bind to port or address"); opts.optflag("d", "dyn", "detect network address dynamically"); opts.optopt("s", "", "save", "save my identity to path.json"); + opts.optopt("l", "", "load", "load my identity to path.json"); opts.optflag("h", "help", "print help"); opts.optopt( "v", @@ -130,6 +131,12 @@ fn main() { // we need all the receiving sockets to be bound within the expected // port range that we open on aws let mut repl_data = make_repl_data(&bind_addr); + if matches.opt_present("l") { + let path = matches.opt_str("l").unwrap(); + if let Ok(file) = File::open(path) { + repl_data = serde_json::from_reader(file).expect("parse"); + } + } let threads = if matches.opt_present("v") { eprintln!("starting validator... {}", repl_data.requests_addr); let path = matches.opt_str("v").unwrap(); @@ -149,10 +156,12 @@ fn main() { } else { eprintln!("starting leader... {}", repl_data.requests_addr); repl_data.current_leader_id = repl_data.id.clone(); + let file = File::create("leader.log").expect("leader.log create"); let server = Server::new_leader( bank, last_id, - Some(Duration::from_millis(1000)), + //Some(Duration::from_millis(1000)), + None, repl_data.clone(), UdpSocket::bind(repl_data.requests_addr).unwrap(), UdpSocket::bind(repl_data.transactions_addr).unwrap(), @@ -160,7 +169,7 @@ fn main() { UdpSocket::bind("0.0.0.0:0").unwrap(), UdpSocket::bind(repl_data.gossip_addr).unwrap(), exit.clone(), - stdout(), + file, ); server.thread_hdls }; @@ -169,7 +178,7 @@ fn main() { let file = File::create(path).expect("file"); serde_json::to_writer(file, &repl_data).expect("serialize"); } - eprintln!("Ready. Listening on {}", bind_addr); + eprintln!("Ready. Listening on {}", repl_data.events_addr); for t in threads { t.join().expect("join"); diff --git a/src/crdt.rs b/src/crdt.rs index d7e6b4505..838f8dfbc 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -16,7 +16,7 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; use hash::Hash; -use packet::SharedBlob; +use packet::{SharedBlob, BLOB_SIZE}; use rayon::prelude::*; use result::{Error, Result}; use ring::rand::{SecureRandom, SystemRandom}; @@ -226,6 +226,7 @@ impl Crdt { .expect("set_index in pub fn broadcast"); //TODO profile this, may need multiple sockets for par_iter trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr); + assert!(blob.meta.size < BLOB_SIZE); let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr); trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr); e @@ -285,6 +286,7 @@ impl Crdt { v.replicate_addr ); //TODO profile this, may need multiple sockets for par_iter + assert!(rblob.meta.size < BLOB_SIZE); s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr) }) .collect(); @@ -327,14 +329,16 @@ impl Crdt { } pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec)> { - if self.table.len() <= 1 { + let daddr = "0.0.0.0:0".parse().unwrap(); + let valid: Vec<_> = self.table + .values() + .filter(|r| r.id != self.me && r.replicate_addr != daddr) + .collect(); + if valid.is_empty() { return Err(Error::CrdtTooSmall); } - let mut n = (Self::random() as usize) % self.table.len(); - while self.table.values().nth(n).unwrap().id == self.me { - n = (Self::random() as usize) % self.table.len(); - } - let addr = self.table.values().nth(n).unwrap().gossip_addr.clone(); + let n = (Self::random() as usize) % valid.len(); + let addr = valid[n].gossip_addr.clone(); let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let out = serialize(&req)?; Ok((addr, out)) @@ -431,6 +435,7 @@ impl Crdt { "responding RequestWindowIndex {} {}", ix, from.replicate_addr ); + assert!(outblob.len() < BLOB_SIZE); sock.send_to(&outblob, from.replicate_addr)?; } Ok(()) @@ -442,7 +447,7 @@ impl Crdt { sock: &UdpSocket, ) -> Result<()> { //TODO cache connections - let mut buf = vec![0u8; 1024 * 64]; + let mut buf = vec![0u8; BLOB_SIZE]; trace!("recv_from on {}", sock.local_addr().unwrap()); let (amt, src) = sock.recv_from(&mut buf)?; trace!("got request from {}", src); @@ -451,7 +456,7 @@ impl Crdt { match r { // TODO sigverify these Protocol::RequestUpdates(v, reqdata) => { - trace!("RequestUpdates {}", v); + trace!("RequestUpdates {} from {}", v, src); let addr = reqdata.gossip_addr; // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = obj.read() @@ -464,12 +469,13 @@ impl Crdt { obj.write() .expect("'obj' write lock in RequestUpdates") .insert(&reqdata); + assert!(rsp.len() < BLOB_SIZE); sock.send_to(&rsp, addr) .expect("'sock.send_to' in RequestUpdates"); trace!("send_to done!"); } Protocol::ReceiveUpdates(from, ups, data) => { - trace!("ReceivedUpdates"); + trace!("ReceivedUpdates {} from {}", ups, src); obj.write() .expect("'obj' write lock in ReceiveUpdates") .apply_updates(from, ups, &data); diff --git a/src/sigverify.rs b/src/sigverify.rs index 1a859b832..13d587aef 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -55,7 +55,7 @@ fn batch_size(batches: &Vec) -> usize { batches .iter() .map(|p| p.read().unwrap().packets.len()) - .fold(0, |x, y| x + y) + .sum() } #[cfg(not(feature = "cuda"))] diff --git a/src/streamer.rs b/src/streamer.rs index 6fdd0ca84..0e2740a0f 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -3,7 +3,7 @@ use crdt::Crdt; #[cfg(feature = "erasure")] use erasure; -use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets}; +use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE}; use result::{Error, Result}; use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; @@ -177,10 +177,11 @@ fn repair_window( trace!("repair_window counter {} {}", *times, *consumed); return Ok(()); } - info!("repair_window request {} {}", *consumed, *received); let sock = UdpSocket::bind("0.0.0.0:0")?; for (to, req) in reqs { //todo cache socket + info!("repair_window request {} {} {}", *consumed, *received, to); + assert!(req.len() < BLOB_SIZE); sock.send_to(&req, to)?; } Ok(()) @@ -510,6 +511,7 @@ mod bench { let mut num = 0; for p in msgs_.read().unwrap().packets.iter() { let a = p.meta.addr(); + assert!(p.meta.size < packet::BLOB_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); num += 1; }