diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index a8a226f54..e19341d79 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -235,7 +235,14 @@ fn spy_node(client_addr: &Arc>) -> (ReplicatedData, UdpSocket addr.set_port(port + 1); let daddr = "0.0.0.0:0".parse().unwrap(); let pubkey = KeyPair::new().pubkey(); - let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr); + let node = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + daddr, + daddr, + daddr, + daddr, + ); (node, gossip) } diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index ee445428b..8741b6a4f 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -129,6 +129,7 @@ fn main() { UdpSocket::bind("0.0.0.0:0").unwrap(), UdpSocket::bind(repl_data.replicate_addr).unwrap(), UdpSocket::bind(repl_data.gossip_addr).unwrap(), + UdpSocket::bind(repl_data.repair_addr).unwrap(), leader, exit.clone(), ); diff --git a/src/crdt.rs b/src/crdt.rs index 7c2d7e494..e4ba429c1 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -77,6 +77,9 @@ pub struct ReplicatedData { pub requests_addr: SocketAddr, /// transactions address pub transactions_addr: SocketAddr, + /// repair address, we use this to jump ahead of the packets + /// destined to the replciate_addr + pub repair_addr: SocketAddr, /// current leader identity pub current_leader_id: PublicKey, /// last verified hash that was submitted to the leader @@ -92,6 +95,7 @@ impl ReplicatedData { replicate_addr: SocketAddr, requests_addr: SocketAddr, transactions_addr: SocketAddr, + repair_addr: SocketAddr, ) -> ReplicatedData { ReplicatedData { id, @@ -101,6 +105,7 @@ impl ReplicatedData { replicate_addr, requests_addr, transactions_addr, + repair_addr, current_leader_id: PublicKey::default(), last_verified_hash: Hash::default(), last_verified_count: 0, @@ -118,6 +123,7 @@ impl ReplicatedData { let gossip_addr = Self::next_port(&bind_addr, 1); let replicate_addr = Self::next_port(&bind_addr, 2); let requests_addr = Self::next_port(&bind_addr, 3); + let repair_addr = Self::next_port(&bind_addr, 4); let pubkey = KeyPair::new().pubkey(); ReplicatedData::new( pubkey, @@ -125,6 +131,7 @@ impl ReplicatedData { replicate_addr, requests_addr, transactions_addr, + repair_addr, ) } } @@ -390,7 +397,7 @@ impl Crdt { let daddr = "0.0.0.0:0".parse().unwrap(); let valid: Vec<_> = self.table .values() - .filter(|r| r.id != self.me && r.replicate_addr != daddr) + .filter(|r| r.id != self.me && r.repair_addr != daddr) .collect(); if valid.is_empty() { return Err(Error::CrdtTooSmall); @@ -509,7 +516,7 @@ impl Crdt { let sz = rblob.meta.size; outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&rblob.data[..sz]); - outblob.meta.set_addr(&from.replicate_addr); + outblob.meta.set_addr(&from.repair_addr); //TODO, set the sender id to the requester so we dont retransmit //come up with a cleaner solution for this when sender signatures are checked outblob.set_id(from.id).expect("blob set_id"); @@ -518,7 +525,7 @@ impl Crdt { } } else { assert!(window.read().unwrap()[pos].is_none()); - info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr); + info!("failed RequestWindowIndex {} {}", ix, from.repair_addr); } None } @@ -580,10 +587,10 @@ impl Crdt { trace!( "received RequestWindowIndex {} {} myaddr {}", ix, - from.replicate_addr, - me.replicate_addr + from.repair_addr, + me.repair_addr ); - assert_ne!(from.replicate_addr, me.replicate_addr); + assert_ne!(from.repair_addr, me.repair_addr); Self::run_window_request(&window, &from, ix, blob_recycler) } Err(_) => { @@ -656,6 +663,7 @@ pub struct Sockets { pub transaction: UdpSocket, pub respond: UdpSocket, pub broadcast: UdpSocket, + pub repair: UdpSocket, } pub struct TestNode { @@ -672,6 +680,7 @@ impl TestNode { let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let repair = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let data = ReplicatedData::new( pubkey, @@ -679,6 +688,7 @@ impl TestNode { replicate.local_addr().unwrap(), requests.local_addr().unwrap(), transaction.local_addr().unwrap(), + repair.local_addr().unwrap(), ); TestNode { data: data, @@ -690,6 +700,7 @@ impl TestNode { transaction, respond, broadcast, + repair, }, } } @@ -698,6 +709,7 @@ impl TestNode { #[cfg(test)] mod tests { use crdt::{parse_port_or_addr, Crdt, ReplicatedData}; + use result::Error; use signature::{KeyPair, KeyPairUtil}; #[test] @@ -719,6 +731,7 @@ mod tests { "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), ); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()); @@ -736,6 +749,15 @@ mod tests { copy } #[test] + fn replicated_data_new_leader() { + let d1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); + assert_eq!(d1.gossip_addr, "127.0.0.1:1235".parse().unwrap()); + assert_eq!(d1.replicate_addr, "127.0.0.1:1236".parse().unwrap()); + assert_eq!(d1.requests_addr, "127.0.0.1:1237".parse().unwrap()); + assert_eq!(d1.transactions_addr, "127.0.0.1:1234".parse().unwrap()); + assert_eq!(d1.repair_addr, "127.0.0.1:1238".parse().unwrap()); + } + #[test] fn update_test() { let d1 = ReplicatedData::new( KeyPair::new().pubkey(), @@ -743,6 +765,7 @@ mod tests { "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), ); let d2 = ReplicatedData::new( KeyPair::new().pubkey(), @@ -750,6 +773,7 @@ mod tests { "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), ); let d3 = ReplicatedData::new( KeyPair::new().pubkey(), @@ -757,6 +781,7 @@ mod tests { "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), ); let mut crdt = Crdt::new(d1.clone()); let (key, ix, ups) = crdt.get_updates_since(0); @@ -784,5 +809,65 @@ mod tests { sorted(&crdt.table.values().map(|x| x.clone()).collect()) ); } + /// Test that insert drops messages that are older + #[test] + fn window_index_request() { + let me = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), + ); + let mut crdt = Crdt::new(me.clone()); + let rv = crdt.window_index_request(0); + assert_matches!(rv, Err(Error::CrdtTooSmall)); + let nxt = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "0.0.0.0:0".parse().unwrap(), + ); + crdt.insert(&nxt); + let rv = crdt.window_index_request(0); + assert_matches!(rv, Err(Error::CrdtTooSmall)); + let nxt = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.2:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), + ); + crdt.insert(&nxt); + let rv = crdt.window_index_request(0).unwrap(); + assert_eq!(nxt.gossip_addr, "127.0.0.2:1234".parse().unwrap()); + assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap()); + let nxt = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.3:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + "127.0.0.1:1238".parse().unwrap(), + ); + crdt.insert(&nxt); + let mut one = false; + let mut two = false; + while !one || !two { + //this randomly picks an option, so eventually it should pick both + let rv = crdt.window_index_request(0).unwrap(); + if rv.0 == "127.0.0.2:1234".parse().unwrap() { + one = true; + } + if rv.0 == "127.0.0.3:1234".parse().unwrap() { + two = true; + } + } + assert!(one && two); + } } diff --git a/src/server.rs b/src/server.rs index bc42164f7..525a65026 100644 --- a/src/server.rs +++ b/src/server.rs @@ -79,6 +79,7 @@ impl Server { respond_socket: UdpSocket, replicate_socket: UdpSocket, gossip_socket: UdpSocket, + repair_socket: UdpSocket, leader_repl_data: ReplicatedData, exit: Arc, ) -> Self { @@ -91,6 +92,7 @@ impl Server { me, gossip_socket, replicate_socket, + repair_socket, leader_repl_data, exit.clone(), ); @@ -98,3 +100,34 @@ impl Server { Server { thread_hdls } } } +#[cfg(test)] +mod tests { + use bank::Bank; + use crdt::TestNode; + use mint::Mint; + use server::Server; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + #[test] + fn validator_exit() { + let tn = TestNode::new(); + let alice = Mint::new(10_000); + let bank = Bank::new(&alice); + let exit = Arc::new(AtomicBool::new(false)); + let v = Server::new_validator( + bank, + tn.data.clone(), + tn.sockets.requests, + tn.sockets.respond, + tn.sockets.replicate, + tn.sockets.gossip, + tn.sockets.repair, + tn.data, + exit.clone(), + ); + exit.store(true, Ordering::Relaxed); + for t in v.thread_hdls { + t.join().unwrap(); + } + } +} diff --git a/src/streamer.rs b/src/streamer.rs index 2b8899a80..5019651f2 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -602,10 +602,8 @@ mod bench { #[cfg(test)] mod test { - use crdt::{Crdt, ReplicatedData}; + use crdt::{Crdt, TestNode}; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; - use signature::KeyPair; - use signature::KeyPairUtil; use std::collections::VecDeque; use std::io; use std::io::Write; @@ -688,29 +686,21 @@ mod test { #[test] pub fn window_send_test() { - let pubkey_me = KeyPair::new().pubkey(); - let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = read.local_addr().unwrap(); - let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let serve = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let transaction = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let tn = TestNode::new(); let exit = Arc::new(AtomicBool::new(false)); - let rep_data = ReplicatedData::new( - pubkey_me, - read.local_addr().unwrap(), - send.local_addr().unwrap(), - serve.local_addr().unwrap(), - transaction.local_addr().unwrap(), - ); - let mut crdt_me = Crdt::new(rep_data); + let mut crdt_me = Crdt::new(tn.data.clone()); let me_id = crdt_me.my_data().id; crdt_me.set_leader(me_id); let subs = Arc::new(RwLock::new(crdt_me)); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = - blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); + let t_receiver = blob_receiver( + exit.clone(), + resp_recycler.clone(), + tn.sockets.gossip, + s_reader, + ).unwrap(); let (s_window, r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let win = default_window(); @@ -724,7 +714,12 @@ mod test { s_retransmit, ); let (s_responder, r_responder) = channel(); - let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let t_responder = responder( + tn.sockets.replicate, + exit.clone(), + resp_recycler.clone(), + r_responder, + ); let mut msgs = VecDeque::new(); for v in 0..10 { let i = 9 - v; @@ -735,7 +730,7 @@ mod test { w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&addr); + w.meta.set_addr(&tn.data.gossip_addr); msgs.push_back(b_); } s_responder.send(msgs).expect("send"); diff --git a/src/tvu.rs b/src/tvu.rs index 79e8d08ff..a47b3658a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -51,6 +51,7 @@ impl Tvu { me: ReplicatedData, gossip_listen_socket: UdpSocket, replicate: UdpSocket, + repair_socket: UdpSocket, leader: ReplicatedData, exit: Arc, ) -> Self { @@ -96,6 +97,12 @@ impl Tvu { blob_recycler.clone(), retransmit_receiver, ); + let t_repair_receiver = streamer::blob_receiver( + exit.clone(), + blob_recycler.clone(), + repair_socket, + blob_sender.clone(), + ).expect("tvu: blob repair receiver fail"); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified @@ -122,6 +129,7 @@ impl Tvu { t_blob_receiver, t_retransmit, t_window, + t_repair_receiver, replicate_stage.thread_hdl, ]; threads.extend(data_replicator.thread_hdls.into_iter()); @@ -218,6 +226,7 @@ pub mod tests { target1.data, target1.sockets.gossip, target1.sockets.replicate, + target1.sockets.repair, leader.data, exit.clone(), ); diff --git a/tests/multinode.rs b/tests/multinode.rs index bf53fa734..b333a2888 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -37,6 +37,7 @@ fn validator( validator.sockets.respond, validator.sockets.replicate, validator.sockets.gossip, + validator.sockets.repair, leader.clone(), exit.clone(), );