diff --git a/ci/test-stable.sh b/ci/test-stable.sh index faf6f73784..38f0482642 100755 --- a/ci/test-stable.sh +++ b/ci/test-stable.sh @@ -15,4 +15,3 @@ _ rustup component add rustfmt-preview _ cargo fmt -- --write-mode=check _ cargo build --verbose _ cargo test --verbose -_ cargo test -- --ignored diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 9212814174..d5c21dc301 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -8,7 +8,7 @@ extern crate solana; use atty::{is, Stream}; use getopts::Options; use solana::crdt::{ReplicatedData, TestNode}; -use solana::fullnode::FullNode; +use solana::fullnode::{FullNode, LedgerFile}; use std::env; use std::fs::File; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -83,12 +83,23 @@ fn main() -> () { let fullnode = if matches.opt_present("t") { let testnet_address_string = matches.opt_str("t").unwrap(); let testnet_addr = testnet_address_string.parse().unwrap(); - FullNode::new(node, false, None, Some(testnet_addr), None, exit) + FullNode::new( + node, + false, + LedgerFile::StdIn, + Some(testnet_addr), + LedgerFile::NoFile, + exit, + ) } else { node.data.current_leader_id = node.data.id.clone(); - let outfile = matches.opt_str("o"); - FullNode::new(node, true, None, None, outfile, exit) + let outfile = if let Some(f) = matches.opt_str("o") { + LedgerFile::Path(f) + } else { + LedgerFile::StdIn + }; + FullNode::new(node, true, LedgerFile::StdIn, None, outfile, exit) }; for t in fullnode.thread_hdls { t.join().expect("join"); diff --git a/src/crdt.rs b/src/crdt.rs index a23ca16096..6e136b5da1 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -109,6 +109,12 @@ pub struct ReplicatedData { last_verified_count: u64, } +fn make_debug_id(buf: &[u8]) -> u64 { + let mut rdr = Cursor::new(&buf[..8]); + rdr.read_u64::() + .expect("rdr.read_u64 in fn debug_id") +} + impl ReplicatedData { pub fn new( id: PublicKey, @@ -132,7 +138,9 @@ impl ReplicatedData { last_verified_count: 0, } } - + pub fn debug_id(&self) -> u64 { + make_debug_id(&self.id) + } fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { let mut nxt_addr = addr.clone(); nxt_addr.set_port(addr.port() + nxt); @@ -224,6 +232,9 @@ impl Crdt { g.table.insert(me.id, me); g } + pub fn debug_id(&self) -> u64 { + make_debug_id(&self.me) + } pub fn my_data(&self) -> &ReplicatedData { &self.table[&self.me] } @@ -233,9 +244,14 @@ impl Crdt { pub fn set_leader(&mut self, key: PublicKey) -> () { let mut me = self.my_data().clone(); + info!( + "{:x}: setting leader to {:x} from {:x}", + me.debug_id(), + make_debug_id(&key), + make_debug_id(&me.current_leader_id), + ); me.current_leader_id = key; me.version += 1; - info!("setting leader to {:?}", &key[..4]); self.insert(&me); } @@ -249,9 +265,9 @@ impl Crdt { //somehow we signed a message for our own identity with a higher version that // we have stored ourselves trace!( - "me: {:?} v.id: {:?} version: {}", - &self.me[..4], - &v.id[..4], + "me: {:x} v.id: {:x} version: {}", + self.debug_id(), + v.debug_id(), v.version ); self.update_index += 1; @@ -259,9 +275,9 @@ impl Crdt { let _ = self.local.insert(v.id, self.update_index); } else { trace!( - "INSERT FAILED me: {:?} data: {:?} new.version: {} me.version: {}", - &self.me[..4], - &v.id[..4], + "INSERT FAILED me: {:x} data: {:?} new.version: {} me.version: {}", + self.debug_id(), + v.debug_id(), v.version, self.table[&v.id].version ); @@ -289,10 +305,15 @@ impl Crdt { .iter() .filter_map(|(&k, v)| { if k != self.me && (now - v) > limit { - info!("purge {:?} {}", &k[..4], now - v); + info!("purge {:x} {}", make_debug_id(&k), now - v); Some(k) } else { - trace!("purge skipped {:?} {} {}", &k[..4], now - v, limit); + trace!( + "purge skipped {:x} {} {}", + make_debug_id(&k), + now - v, + limit + ); None } }) @@ -317,7 +338,11 @@ impl Crdt { ) -> Result<()> { let me: ReplicatedData = { let robj = obj.read().expect("'obj' read lock in crdt::index_blobs"); - debug!("broadcast table {}", robj.table.len()); + debug!( + "{:x}: broadcast table {}", + robj.debug_id(), + robj.table.len() + ); robj.table[&robj.me].clone() }; @@ -454,10 +479,11 @@ impl Crdt { let errs: Vec<_> = orders .par_iter() .map(|v| { - trace!( - "retransmit blob {} to {}", + debug!( + "{:x}: retransmit blob {} to {:x}", + me.debug_id(), rblob.get_index().unwrap(), - v.replicate_addr + v.debug_id(), ); //TODO profile this, may need multiple sockets for par_iter assert!(rblob.meta.size < BLOB_SIZE); @@ -556,9 +582,9 @@ impl Crdt { let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); trace!( - "created gossip request from {:?} to {:?} {}", - &self.me[..4], - &v.id[..4], + "created gossip request from {:x} to {:x} {}", + self.debug_id(), + v.debug_id(), v.gossip_addr ); @@ -596,11 +622,17 @@ impl Crdt { for v in cur { let cnt = table.entry(&v.current_leader_id).or_insert(0); *cnt += 1; - trace!("leader {:?} {}", &v.current_leader_id[..4], *cnt); + trace!("leader {:x} {}", make_debug_id(&v.current_leader_id), *cnt); } let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); - if sorted.len() > 0 { - debug!("sorted leaders {:?}", sorted); + let my_id = self.debug_id(); + for x in sorted.iter() { + trace!( + "{:x}: sorted leaders {:x} votes: {}", + my_id, + make_debug_id(&x.0), + x.1 + ); } sorted.sort_by_key(|a| a.1); sorted.last().map(|a| *a.0) @@ -769,18 +801,18 @@ impl Crdt { if len < 1 { let me = obj.read().unwrap(); trace!( - "no updates me {:?} ix {} since {}", - &me.me[..4], + "no updates me {:x} ix {} since {}", + me.debug_id(), me.update_index, v ); None } else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) { trace!( - "sending updates me {:?} len {} to {:?} {}", - &obj.read().unwrap().me[..4], + "sending updates me {:x} len {} to {:x} {}", + obj.read().unwrap().debug_id(), len, - &from_rd.id[..4], + from_rd.debug_id(), addr, ); Some(r) @@ -790,7 +822,12 @@ impl Crdt { } } Ok(Protocol::ReceiveUpdates(from, ups, data, external_liveness)) => { - trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len()); + trace!( + "ReceivedUpdates {:x} {} {}", + make_debug_id(&from), + ups, + data.len() + ); obj.write() .expect("'obj' write lock in ReceiveUpdates") .apply_updates(from, ups, &data, &external_liveness); diff --git a/src/drone.rs b/src/drone.rs index 98057d6612..1d308079dc 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -245,17 +245,13 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let carlos_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); + let leader_data = leader.data.clone(); let server = FullNode::new_leader( bank, 0, Some(Duration::from_millis(30)), - leader.data.clone(), - leader.sockets.requests, - leader.sockets.transaction, - leader.sockets.broadcast, - leader.sockets.respond, - leader.sockets.gossip, + leader, exit.clone(), sink(), ); @@ -266,8 +262,8 @@ mod tests { let mut drone = Drone::new( alice.keypair(), addr, - leader.data.transactions_addr, - leader.data.requests_addr, + leader_data.transactions_addr, + leader_data.requests_addr, None, Some(5_000_050), ); @@ -291,9 +287,9 @@ mod tests { UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); let mut client = ThinClient::new( - leader.data.requests_addr, + leader_data.requests_addr, requests_socket, - leader.data.transactions_addr, + leader_data.transactions_addr, transactions_socket, ); diff --git a/src/fullnode.rs b/src/fullnode.rs index c72cf22ef1..c23cc47b84 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -8,9 +8,8 @@ use packet::BlobRecycler; use rpu::Rpu; use std::fs::File; use std::io::Write; -use std::io::{stdin, stdout, BufReader}; +use std::io::{sink, stdin, stdout, BufReader}; use std::net::SocketAddr; -use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; @@ -24,30 +23,42 @@ pub struct FullNode { pub thread_hdls: Vec>, } +pub enum LedgerFile { + NoFile, + StdIn, + StdOut, + Sink, + Path(String), +} + impl FullNode { pub fn new( mut node: TestNode, leader: bool, - infile: Option, + infile: LedgerFile, network_entry_for_validator: Option, - outfile_for_leader: Option, + outfile_for_leader: LedgerFile, exit: Arc, ) -> FullNode { info!("creating bank..."); let bank = Bank::default(); - let entry_height = if let Some(path) = infile { - let f = File::open(path).unwrap(); - let mut r = BufReader::new(f); - let entries = - entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); - info!("processing ledger..."); - bank.process_ledger(entries).expect("process_ledger") - } else { - let mut r = BufReader::new(stdin()); - let entries = - entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); - info!("processing ledger..."); - bank.process_ledger(entries).expect("process_ledger") + let entry_height = match infile { + LedgerFile::Path(path) => { + let f = File::open(path).unwrap(); + let mut r = BufReader::new(f); + let entries = + entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); + info!("processing ledger..."); + bank.process_ledger(entries).expect("process_ledger") + } + LedgerFile::StdIn => { + let mut r = BufReader::new(stdin()); + let entries = + entry_writer::read_entries(&mut r).map(|e| e.expect("failed to parse entry")); + info!("processing ledger..."); + bank.process_ledger(entries).expect("process_ledger") + } + _ => panic!("expected LedgerFile::StdIn or LedgerFile::Path for infile"), }; // entry_height is the network-wide agreed height of the ledger. @@ -62,6 +73,7 @@ impl FullNode { "starting... local gossip address: {} (advertising {})", local_gossip_addr, node.data.gossip_addr ); + let requests_addr = node.data.requests_addr.clone(); if !leader { let testnet_addr = network_entry_for_validator.expect("validator requires entry"); @@ -69,56 +81,57 @@ impl FullNode { let server = FullNode::new_validator( bank, entry_height, - node.data.clone(), - node.sockets.requests, - node.sockets.respond, - node.sockets.replicate, - node.sockets.gossip, - node.sockets.repair, + node, network_entry_point, exit.clone(), ); info!( "validator ready... local request address: {} (advertising {}) connected to: {}", - local_requests_addr, node.data.requests_addr, testnet_addr + local_requests_addr, requests_addr, testnet_addr ); server } else { node.data.current_leader_id = node.data.id.clone(); - let server = if let Some(file) = outfile_for_leader { - FullNode::new_leader( - bank, - entry_height, - //Some(Duration::from_millis(1000)), - None, - node.data.clone(), - node.sockets.requests, - node.sockets.transaction, - node.sockets.broadcast, - node.sockets.respond, - node.sockets.gossip, - exit.clone(), - File::create(file).expect("opening ledger file"), - ) - } else { - FullNode::new_leader( - bank, - entry_height, - //Some(Duration::from_millis(1000)), - None, - node.data.clone(), - node.sockets.requests, - node.sockets.transaction, - node.sockets.broadcast, - node.sockets.respond, - node.sockets.gossip, - exit.clone(), - stdout(), - ) - }; + let server = + match outfile_for_leader { + LedgerFile::Path(file) => { + FullNode::new_leader( + bank, + entry_height, + //Some(Duration::from_millis(1000)), + None, + node, + exit.clone(), + File::create(file).expect("opening ledger file"), + ) + }, + LedgerFile::StdOut => { + FullNode::new_leader( + bank, + entry_height, + //Some(Duration::from_millis(1000)), + None, + node, + exit.clone(), + stdout(), + ) + }, + LedgerFile::Sink => { + FullNode::new_leader( + bank, + entry_height, + //Some(Duration::from_millis(1000)), + None, + node, + exit.clone(), + sink(), + ) + }, + _ => panic!("expected LedgerFile::StdOut, LedgerFile::Path, or LedgerFile::Sink, for infile"), + }; info!( "leader ready... local request address: {} (advertising {})", - local_requests_addr, node.data.requests_addr + local_requests_addr, requests_addr ); server } @@ -151,45 +164,43 @@ impl FullNode { bank: Bank, entry_height: u64, tick_duration: Option, - me: ReplicatedData, - requests_socket: UdpSocket, - transactions_socket: UdpSocket, - broadcast_socket: UdpSocket, - respond_socket: UdpSocket, - gossip_socket: UdpSocket, + node: TestNode, exit: Arc, writer: W, ) -> Self { let bank = Arc::new(bank); let mut thread_hdls = vec![]; - let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + let rpu = Rpu::new( + bank.clone(), + node.sockets.requests, + node.sockets.respond, + exit.clone(), + ); thread_hdls.extend(rpu.thread_hdls); let blob_recycler = BlobRecycler::default(); let (tpu, blob_receiver) = Tpu::new( bank.clone(), tick_duration, - transactions_socket, + node.sockets.transaction, blob_recycler.clone(), exit.clone(), writer, ); thread_hdls.extend(tpu.thread_hdls); - - let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); let window = streamer::default_window(); - let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let ncp = Ncp::new( crdt.clone(), window.clone(), - gossip_socket, - gossip_send_socket, + node.sockets.gossip, + node.sockets.gossip_send, exit.clone(), ).expect("Ncp::new"); thread_hdls.extend(ncp.thread_hdls); let t_broadcast = streamer::broadcaster( - broadcast_socket, + node.sockets.broadcast, exit.clone(), crdt, window, @@ -234,32 +245,30 @@ impl FullNode { pub fn new_validator( bank: Bank, entry_height: u64, - me: ReplicatedData, - requests_socket: UdpSocket, - respond_socket: UdpSocket, - replicate_socket: UdpSocket, - gossip_listen_socket: UdpSocket, - repair_socket: UdpSocket, + node: TestNode, entry_point: ReplicatedData, exit: Arc, ) -> Self { let bank = Arc::new(bank); let mut thread_hdls = vec![]; - let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + let rpu = Rpu::new( + bank.clone(), + node.sockets.requests, + node.sockets.respond, + exit.clone(), + ); thread_hdls.extend(rpu.thread_hdls); - let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); crdt.write() .expect("'crdt' write lock before insert() in pub fn replicate") .insert(&entry_point); let window = streamer::default_window(); - let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let retransmit_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let ncp = Ncp::new( crdt.clone(), window.clone(), - gossip_listen_socket, - gossip_send_socket, + node.sockets.gossip, + node.sockets.gossip_send, exit.clone(), ).expect("Ncp::new"); @@ -268,9 +277,9 @@ impl FullNode { entry_height, crdt.clone(), window.clone(), - replicate_socket, - repair_socket, - retransmit_socket, + node.sockets.replicate, + node.sockets.repair, + node.sockets.retransmit, exit.clone(), ); thread_hdls.extend(tvu.thread_hdls); @@ -292,18 +301,8 @@ mod tests { let alice = Mint::new(10_000); let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); - let v = FullNode::new_validator( - bank, - 0, - tn.data.clone(), - tn.sockets.requests, - tn.sockets.respond, - tn.sockets.replicate, - tn.sockets.gossip, - tn.sockets.repair, - tn.data, - exit.clone(), - ); + let entry = tn.data.clone(); + let v = FullNode::new_validator(bank, 0, tn, entry, exit.clone()); exit.store(true, Ordering::Relaxed); for t in v.thread_hdls { t.join().unwrap(); diff --git a/src/streamer.rs b/src/streamer.rs index 739489e91e..25bc7d0865 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,6 +1,6 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crdt::Crdt; +use crdt::{Crdt, ReplicatedData}; #[cfg(feature = "erasure")] use erasure; use packet::{ @@ -92,7 +92,7 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> break; } } - debug!("batch len {}", batch.len()); + trace!("batch len {}", batch.len()); Ok((batch, len)) } @@ -171,6 +171,7 @@ fn find_next_missing( } fn repair_window( + debug_id: u64, locked_window: &Window, crdt: &Arc>, _recycler: &BlobRecycler, @@ -199,14 +200,33 @@ fn repair_window( *times += 1; //if times flips from all 1s 7 -> 8, 15 -> 16, we retry otherwise return Ok if *times & (*times - 1) != 0 { - trace!("repair_window counter {} {}", *times, *consumed); + trace!( + "repair_window counter {} {} {}", + *times, + *consumed, + *received + ); return Ok(()); } + let reqs = find_next_missing(locked_window, crdt, consumed, received)?; + if reqs.len() > 0 { + debug!( + "{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}", + debug_id, + *times, + *consumed, + *received, + reqs.len() + ); + } let sock = UdpSocket::bind("0.0.0.0:0")?; for (to, req) in reqs { //todo cache socket - info!("repair_window request {} {} {}", *consumed, *received, to); + info!( + "{:x} repair_window request {} {} {}", + debug_id, *consumed, *received, to + ); assert!(req.len() < BLOB_SIZE); sock.send_to(&req, to)?; } @@ -214,6 +234,7 @@ fn repair_window( } fn recv_window( + debug_id: u64, locked_window: &Window, crdt: &Arc>, recycler: &BlobRecycler, @@ -225,45 +246,49 @@ fn recv_window( ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; - let leader_id = crdt.read() + let maybe_leader: Option = crdt.read() .expect("'crdt' read lock in fn recv_window") .leader_data() - .expect("leader not ready") - .id; + .cloned(); while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } { //retransmit all leader blocks let mut retransmitq = VecDeque::new(); - for b in &dq { - let p = b.read().expect("'b' read lock in fn recv_window"); - //TODO this check isn't safe against adverserial packets - //we need to maintain a sequence window - trace!( - "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.get_index().expect("get_index in fn recv_window"), - p.get_id().expect("get_id in trace! fn recv_window"), - p.meta.addr(), - leader_id - ); - if p.get_id().expect("get_id in fn recv_window") == leader_id { - //TODO - //need to copy the retransmitted blob - //otherwise we get into races with which thread - //should do the recycling - // - //a better abstraction would be to recycle when the blob - //is dropped via a weakref to the recycler - let nv = recycler.allocate(); - { - let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); - let sz = p.meta.size; - mnv.meta.size = sz; - mnv.data[..sz].copy_from_slice(&p.data[..sz]); + if let Some(leader) = maybe_leader { + for b in &dq { + let p = b.read().expect("'b' read lock in fn recv_window"); + //TODO this check isn't safe against adverserial packets + //we need to maintain a sequence window + let leader_id = leader.id; + trace!( + "idx: {} addr: {:?} id: {:?} leader: {:?}", + p.get_index().expect("get_index in fn recv_window"), + p.get_id().expect("get_id in trace! fn recv_window"), + p.meta.addr(), + leader_id + ); + if p.get_id().expect("get_id in fn recv_window") == leader_id { + //TODO + //need to copy the retransmitted blob + //otherwise we get into races with which thread + //should do the recycling + // + //a better abstraction would be to recycle when the blob + //is dropped via a weakref to the recycler + let nv = recycler.allocate(); + { + let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); + let sz = p.meta.size; + mnv.meta.size = sz; + mnv.data[..sz].copy_from_slice(&p.data[..sz]); + } + retransmitq.push_back(nv); } - retransmitq.push_back(nv); } + } else { + warn!("{:x}: no leader to retransmit from", debug_id); } if !retransmitq.is_empty() { retransmit.send(retransmitq)?; @@ -283,8 +308,8 @@ fn recv_window( // probably from a repair window request if pix < *consumed { debug!( - "received: {} but older than consumed: {} skipping..", - pix, *consumed + "{:x}: received: {} but older than consumed: {} skipping..", + debug_id, pix, *consumed ); continue; } @@ -316,9 +341,9 @@ fn recv_window( window[w] = Some(b); } else if let Some(cblob) = &window[w] { if cblob.read().unwrap().get_index().unwrap() != pix as u64 { - warn!("overrun blob at index {:}", w); + warn!("{:x}: overrun blob at index {:}", debug_id, w); } else { - debug!("duplicate blob at index {:}", w); + debug!("{:x}: duplicate blob at index {:}", debug_id, w); } } loop { @@ -404,7 +429,7 @@ fn print_window(locked_window: &Window, consumed: u64) { } }) .collect(); - debug!("WINDOW ({}): {}", consumed, buf.join("")); + trace!("WINDOW ({}): {}", consumed, buf.join("")); } } @@ -429,11 +454,13 @@ pub fn window( let mut received = entry_height; let mut last = entry_height; let mut times = 0; + let debug_id = crdt.read().unwrap().debug_id(); loop { if exit.load(Ordering::Relaxed) { break; } let _ = recv_window( + debug_id, &window, &crdt, &recycler, @@ -444,6 +471,7 @@ pub fn window( &retransmit, ); let _ = repair_window( + debug_id, &window, &crdt, &recycler, @@ -452,12 +480,14 @@ pub fn window( &mut consumed, &mut received, ); + assert!(consumed <= (received + 1)); } }) .unwrap() } fn broadcast( + debug_id: u64, crdt: &Arc>, window: &Window, recycler: &BlobRecycler, @@ -487,7 +517,7 @@ fn broadcast( erasure::add_coding_blobs(recycler, &mut blobs, *receive_index); let blobs_len = blobs.len(); - debug!("broadcast blobs.len: {}", blobs_len); + debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len); // Index the blobs Crdt::index_blobs(crdt, &blobs, receive_index)?; @@ -558,11 +588,13 @@ pub fn broadcaster( .spawn(move || { let mut transmit_index = entry_height; let mut receive_index = entry_height; + let debug_id = crdt.read().unwrap().debug_id(); loop { if exit.load(Ordering::Relaxed) { break; } let _ = broadcast( + debug_id, &crdt, &window, &recycler, @@ -737,6 +769,7 @@ mod bench { #[cfg(test)] mod test { use crdt::{Crdt, TestNode}; + use logger; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; use std::collections::VecDeque; use std::io; @@ -821,6 +854,7 @@ mod test { #[test] pub fn window_send_test() { + logger::setup(); let tn = TestNode::new(); let exit = Arc::new(AtomicBool::new(false)); let mut crdt_me = Crdt::new(tn.data.clone()); diff --git a/src/thin_client.rs b/src/thin_client.rs index 90cb1eab3e..cd1866f0c9 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -241,6 +241,7 @@ mod tests { fn test_thin_client() { logger::setup(); let leader = TestNode::new(); + let leader_data = leader.data.clone(); let alice = Mint::new(10_000); let bank = Bank::new(&alice); @@ -251,12 +252,7 @@ mod tests { bank, 0, Some(Duration::from_millis(30)), - leader.data.clone(), - leader.sockets.requests, - leader.sockets.transaction, - leader.sockets.broadcast, - leader.sockets.respond, - leader.sockets.gossip, + leader, exit.clone(), sink(), ); @@ -266,9 +262,9 @@ mod tests { let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader.data.requests_addr, + leader_data.requests_addr, requests_socket, - leader.data.transactions_addr, + leader_data.transactions_addr, transactions_socket, ); let last_id = client.get_last_id(); @@ -291,17 +287,13 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); + let leader_data = leader.data.clone(); let server = FullNode::new_leader( bank, 0, Some(Duration::from_millis(30)), - leader.data.clone(), - leader.sockets.requests, - leader.sockets.transaction, - leader.sockets.broadcast, - leader.sockets.respond, - leader.sockets.gossip, + leader, exit.clone(), sink(), ); @@ -313,9 +305,9 @@ mod tests { .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader.data.requests_addr, + leader_data.requests_addr, requests_socket, - leader.data.transactions_addr, + leader_data.transactions_addr, transactions_socket, ); let last_id = client.get_last_id(); @@ -349,17 +341,12 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - + let leader_data = leader.data.clone(); let server = FullNode::new_leader( bank, 0, Some(Duration::from_millis(30)), - leader.data.clone(), - leader.sockets.requests, - leader.sockets.transaction, - leader.sockets.broadcast, - leader.sockets.respond, - leader.sockets.gossip, + leader, exit.clone(), sink(), ); @@ -371,9 +358,9 @@ mod tests { .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader.data.requests_addr, + leader_data.requests_addr, requests_socket, - leader.data.transactions_addr, + leader_data.transactions_addr, transactions_socket, ); let last_id = client.get_last_id(); diff --git a/tests/multinode.rs b/tests/multinode.rs index d53b0562d9..3c772c504a 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1,57 +1,29 @@ #[macro_use] extern crate log; extern crate bincode; +extern crate serde_json; extern crate solana; -use solana::bank::Bank; use solana::crdt::TestNode; use solana::crdt::{Crdt, ReplicatedData}; -use solana::fullnode::FullNode; +use solana::entry_writer::EntryWriter; +use solana::fullnode::{FullNode, LedgerFile}; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::streamer::default_window; use solana::thin_client::ThinClient; -use std::io; -use std::io::sink; +use std::fs::File; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; -use std::thread::JoinHandle; use std::time::Duration; -fn validator( - leader: &ReplicatedData, - exit: Arc, - alice: &Mint, - threads: &mut Vec>, -) { - let validator = TestNode::new(); - let replicant_bank = Bank::new(&alice); - let mut ts = FullNode::new_validator( - replicant_bank, - 0, - validator.data.clone(), - validator.sockets.requests, - validator.sockets.respond, - validator.sockets.replicate, - validator.sockets.gossip, - validator.sockets.repair, - leader.clone(), - exit.clone(), - ); - threads.append(&mut ts.thread_hdls); -} - -fn converge( - leader: &ReplicatedData, - exit: Arc, - num_nodes: usize, - threads: &mut Vec>, -) -> (Vec, PublicKey) { +fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec { //lets spy on the network + let exit = Arc::new(AtomicBool::new(false)); let mut spy = TestNode::new(); let daddr = "0.0.0.0:0".parse().unwrap(); let me = spy.data.id.clone(); @@ -67,30 +39,49 @@ fn converge( spy_window, spy.sockets.gossip, spy.sockets.gossip_send, - exit, + exit.clone(), ).unwrap(); //wait for the network to converge let mut converged = false; + let mut rv = vec![]; for _ in 0..30 { let num = spy_ref.read().unwrap().convergence(); - if num == num_nodes as u64 { + let mut v: Vec = spy_ref + .read() + .unwrap() + .table + .values() + .into_iter() + .filter(|x| x.id != me) + .filter(|x| x.requests_addr != daddr) + .cloned() + .collect(); + if num >= num_nodes as u64 && v.len() >= num_nodes { + rv.append(&mut v); converged = true; break; } sleep(Duration::new(1, 0)); } assert!(converged); - threads.extend(dr.thread_hdls.into_iter()); - let v: Vec = spy_ref - .read() - .unwrap() - .table - .values() - .into_iter() - .filter(|x| x.id != me) - .map(|x| x.clone()) - .collect(); - (v.clone(), me) + exit.store(true, Ordering::Relaxed); + for t in dr.thread_hdls.into_iter() { + t.join().unwrap(); + } + rv +} + +fn genesis(num: i64) -> (Mint, String) { + let mint = Mint::new(num); + let id = { + let ids: Vec<_> = mint.pubkey().iter().map(|id| format!("{}", id)).collect(); + ids.join("") + }; + let path = format!("target/test_multi_node_dynamic_network-{}.log", id); + let mut writer = File::create(path.clone()).unwrap(); + + EntryWriter::write_entries(&mut writer, mint.create_entries()).unwrap(); + (mint, path.to_string()) } #[test] @@ -99,34 +90,38 @@ fn test_multi_node_validator_catchup_from_zero() { const N: usize = 5; trace!("test_multi_node_validator_catchup_from_zero"); let leader = TestNode::new(); - let alice = Mint::new(10_000); + let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let leader_bank = Bank::new(&alice); - let server = FullNode::new_leader( - leader_bank, - 0, + let (alice, ledger_path) = genesis(10_000); + let server = FullNode::new( + leader, + true, + LedgerFile::Path(ledger_path.clone()), None, - leader.data.clone(), - leader.sockets.requests, - leader.sockets.transaction, - leader.sockets.broadcast, - leader.sockets.respond, - leader.sockets.gossip, + LedgerFile::Sink, exit.clone(), - sink(), ); - let mut threads = server.thread_hdls; for _ in 0..N { - validator(&leader.data, exit.clone(), &alice, &mut threads); + let validator = TestNode::new(); + let mut val = FullNode::new( + validator, + false, + LedgerFile::Path(ledger_path.clone()), + Some(leader_data.gossip_addr), + LedgerFile::NoFile, + exit.clone(), + ); + threads.append(&mut val.thread_hdls); } - let (servers, spy_id0) = converge(&leader.data, exit.clone(), N + 2, &mut threads); + let servers = converge(&leader_data, N + 1); //contains the leader addr as well assert_eq!(servers.len(), N + 1); //verify leader can do transfer - let leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap(); + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap(); assert_eq!(leader_balance, 500); //verify validator has the same balance let mut success = 0usize; @@ -144,13 +139,23 @@ fn test_multi_node_validator_catchup_from_zero() { success = 0; // start up another validator, converge and then check everyone's balances - validator(&leader.data, exit.clone(), &alice, &mut threads); - let (servers, _) = converge(&leader.data, exit.clone(), N + 4, &mut threads); + let mut val = FullNode::new( + TestNode::new(), + false, + LedgerFile::Path(ledger_path.clone()), + Some(leader_data.gossip_addr), + LedgerFile::NoFile, + exit.clone(), + ); + threads.append(&mut val.thread_hdls); + //contains the leader and new node + let servers = converge(&leader_data, N + 2); - let mut leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap(); + let mut leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap(); info!("leader balance {}", leader_balance); loop { - let mut client = mk_client(&leader.data); + let mut client = mk_client(&leader_data); leader_balance = client.poll_get_balance(&bob_pubkey).unwrap(); if leader_balance == 1000 { break; @@ -160,22 +165,20 @@ fn test_multi_node_validator_catchup_from_zero() { assert_eq!(leader_balance, 1000); for server in servers.iter() { - if server.id != spy_id0 { - let mut client = mk_client(server); - info!("1server: {:?}", server.id[0]); - for _ in 0..10 { - if let Ok(bal) = client.poll_get_balance(&bob_pubkey) { - info!("validator balance {}", bal); - if bal == leader_balance { - success += 1; - break; - } + let mut client = mk_client(server); + info!("1server: {:?}", server.id[0]); + for _ in 0..10 { + if let Ok(bal) = client.poll_get_balance(&bob_pubkey) { + info!("validator balance {}", bal); + if bal == leader_balance { + success += 1; + break; } - sleep(Duration::from_millis(500)); } + sleep(Duration::from_millis(500)); } } - assert_eq!(success, (servers.len() - 1)); + assert_eq!(success, servers.len()); exit.store(true, Ordering::Relaxed); for t in threads { @@ -189,34 +192,36 @@ fn test_multi_node_basic() { const N: usize = 5; trace!("test_multi_node_basic"); let leader = TestNode::new(); - let alice = Mint::new(10_000); + let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - - let leader_bank = Bank::new(&alice); - let server = FullNode::new_leader( - leader_bank, - 0, + let (alice, ledger_path) = genesis(10_000); + let server = FullNode::new( + leader, + true, + LedgerFile::Path(ledger_path.clone()), None, - leader.data.clone(), - leader.sockets.requests, - leader.sockets.transaction, - leader.sockets.broadcast, - leader.sockets.respond, - leader.sockets.gossip, + LedgerFile::Sink, exit.clone(), - sink(), ); - - let mut threads = server.thread_hdls; + let threads = server.thread_hdls; for _ in 0..N { - validator(&leader.data, exit.clone(), &alice, &mut threads); + let validator = TestNode::new(); + FullNode::new( + validator, + false, + LedgerFile::Path(ledger_path.clone()), + Some(leader_data.gossip_addr), + LedgerFile::NoFile, + exit.clone(), + ); } - let (servers, _) = converge(&leader.data, exit.clone(), N + 2, &mut threads); + let servers = converge(&leader_data, N + 1); //contains the leader addr as well assert_eq!(servers.len(), N + 1); //verify leader can do transfer - let leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap(); + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, None).unwrap(); assert_eq!(leader_balance, 500); //verify validator has the same balance let mut success = 0usize; @@ -235,6 +240,109 @@ fn test_multi_node_basic() { for t in threads { t.join().unwrap(); } + std::fs::remove_file(ledger_path).unwrap(); +} + +#[test] +fn test_multi_node_dynamic_network() { + logger::setup(); + const N: usize = 3; + let leader = TestNode::new(); + let bob_pubkey = KeyPair::new().pubkey(); + let exit = Arc::new(AtomicBool::new(false)); + let (alice, ledger_path) = genesis(100_000); + let leader_data = leader.data.clone(); + let server = FullNode::new( + leader, + true, + LedgerFile::Path(ledger_path.clone()), + None, + LedgerFile::Sink, + exit.clone(), + ); + let threads = server.thread_hdls; + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); + assert_eq!(leader_balance, 500); + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); + assert_eq!(leader_balance, 1000); + + let mut vals: Vec<(ReplicatedData, Arc, FullNode)> = (0..N) + .into_iter() + .map(|_| { + let exit = Arc::new(AtomicBool::new(false)); + let validator = TestNode::new(); + let rd = validator.data.clone(); + let val = FullNode::new( + validator, + false, + LedgerFile::Path(ledger_path.clone()), + Some(leader_data.gossip_addr), + LedgerFile::NoFile, + exit.clone(), + ); + (rd, exit, val) + }) + .collect(); + for i in 0..N { + //verify leader can do transfer + let expected = ((i + 3) * 500) as i64; + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected)) + .unwrap(); + assert_eq!(leader_balance, expected); + //verify all validators have the same balance + let mut success = 0usize; + for server in vals.iter() { + let mut client = mk_client(&server.0); + let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected)); + info!("{:x} {} get_balance: {:?}", server.0.debug_id(), i, getbal); + if let Some(bal) = getbal { + if bal == leader_balance { + success += 1; + } + } + } + info!("SUCCESS {} out of {}", success, vals.len()); + // this should be almost true, or at least vals.len() - 1 while the other node catches up + //assert!(success == vals.len()); + //kill a validator + vals[i].1.store(true, Ordering::Relaxed); + let mut ts = vec![]; + ts.append(&mut vals[i].2.thread_hdls); + for t in ts.into_iter() { + t.join().unwrap(); + } + info!("{:x} KILLED", vals[i].0.debug_id()); + //add a new one + vals[i] = { + let exit = Arc::new(AtomicBool::new(false)); + let validator = TestNode::new(); + let rd = validator.data.clone(); + let val = FullNode::new( + validator, + false, + LedgerFile::Path(ledger_path.clone()), + Some(leader_data.gossip_addr), + LedgerFile::NoFile, + exit.clone(), + ); + info!("{:x} ADDED", rd.debug_id()); + (rd, exit, val) + }; + } + for (_, exit, val) in vals.into_iter() { + exit.store(true, Ordering::Relaxed); + for t in val.thread_hdls { + t.join().unwrap(); + } + } + exit.store(true, Ordering::Relaxed); + for t in threads { + t.join().unwrap(); + } + std::fs::remove_file(ledger_path).unwrap(); } fn mk_client(leader: &ReplicatedData) -> ThinClient { @@ -243,7 +351,9 @@ fn mk_client(leader: &ReplicatedData) -> ThinClient { .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - + let daddr = "0.0.0.0:0".parse().unwrap(); + assert!(leader.requests_addr != daddr); + assert!(leader.transactions_addr != daddr); ThinClient::new( leader.requests_addr, requests_socket, @@ -252,11 +362,31 @@ fn mk_client(leader: &ReplicatedData) -> ThinClient { ) } -fn tx_and_retry_get_balance( +fn retry_get_balance( + client: &mut ThinClient, + bob_pubkey: &PublicKey, + expected: Option, +) -> Option { + for _ in 0..10 { + let out = client.poll_get_balance(bob_pubkey); + if expected.is_none() { + return out.ok().clone(); + } + if let (Some(e), Ok(o)) = (expected, out) { + if o == e { + return Some(o); + } + } + } + None +} + +fn send_tx_and_retry_get_balance( leader: &ReplicatedData, alice: &Mint, bob_pubkey: &PublicKey, -) -> io::Result { + expected: Option, +) -> Option { let mut client = mk_client(leader); trace!("getting leader last_id"); let last_id = client.get_last_id(); @@ -264,5 +394,5 @@ fn tx_and_retry_get_balance( let _sig = client .transfer(500, &alice.keypair(), *bob_pubkey, &last_id) .unwrap(); - client.poll_get_balance(bob_pubkey) + retry_get_balance(&mut client, bob_pubkey, expected) }