diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 070fca2df8..de51a382f7 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -12,6 +12,7 @@ use isatty::stdin_isatty; use pnet::datalink; use rayon::prelude::*; use solana::crdt::{Crdt, ReplicatedData}; +use solana::data_replicator::DataReplicator; use solana::mint::MintDemo; use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; use solana::streamer::default_window; @@ -245,9 +246,15 @@ fn converge( spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); - let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); - let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); + let window = default_window(); + let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + let data_replicator = DataReplicator::new( + spy_ref.clone(), + window.clone(), + spy_gossip, + gossip_send_socket, + exit.clone(), + ).expect("DataReplicator::new"); //wait for the network to converge for _ in 0..30 { let min = spy_ref.read().unwrap().convergence(); @@ -257,8 +264,7 @@ fn converge( } sleep(Duration::new(1, 0)); } - threads.push(t_spy_listen); - threads.push(t_spy_gossip); + threads.extend(data_replicator.thread_hdls.into_iter()); let v: Vec = spy_ref .read() .unwrap() diff --git a/src/crdt.rs b/src/crdt.rs index 838f8dfbc3..a78d116f51 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -16,27 +16,30 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; use hash::Hash; -use packet::{SharedBlob, BLOB_SIZE}; +use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use rayon::prelude::*; use result::{Error, Result}; use ring::rand::{SecureRandom, SystemRandom}; +use signature::{KeyPair, KeyPairUtil}; use signature::{PublicKey, Signature}; use std; use std::collections::HashMap; +use std::collections::VecDeque; use std::io::Cursor; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; +use streamer::{BlobReceiver, BlobSender}; /// Structure to be replicated by the network -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct ReplicatedData { pub id: PublicKey, sig: Signature, /// should always be increasing - version: u64, + pub version: u64, /// address to connect to for gossip pub gossip_addr: SocketAddr, /// address to connect to for replication @@ -149,15 +152,20 @@ impl Crdt { if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { //somehow we signed a message for our own identity with a higher version that // we have stored ourselves - trace!("me: {:?}", self.me[0]); - trace!("v.id: {:?}", v.id[0]); - trace!("insert! {}", v.version); + trace!( + "me: {:?} v.id: {:?} version: {}", + &self.me[..4], + &v.id[..4], + v.version + ); self.update_index += 1; let _ = self.table.insert(v.id.clone(), v.clone()); let _ = self.local.insert(v.id, self.update_index); } else { trace!( - "INSERT FAILED new.version: {} me.version: {}", + "INSERT FAILED me: {:?} data: {:?} new.version: {} me.version: {}", + &self.me[..4], + &v.id[..4], v.version, self.table[&v.id].version ); @@ -352,18 +360,32 @@ impl Crdt { fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> { let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect(); if options.len() < 1 { - trace!("crdt too small for gossip"); + trace!( + "crdt too small for gossip {:?} {}", + &self.me[..4], + self.table.len() + ); return Err(Error::CrdtTooSmall); } let n = (Self::random() as usize) % options.len(); let v = options[n].clone(); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); + trace!( + "created gossip request from {:?} to {:?} {}", + &self.me[..4], + &v.id[..4], + v.gossip_addr + ); Ok((v.gossip_addr, req)) } /// At random pick a node and try to get updated changes from them - fn run_gossip(obj: &Arc>) -> Result<()> { + fn run_gossip( + obj: &Arc>, + blob_sender: &BlobSender, + blob_recycler: &BlobRecycler, + ) -> Result<()> { //TODO we need to keep track of stakes and weight the selection by stake size //TODO cache sockets @@ -372,12 +394,12 @@ impl Crdt { let (remote_gossip_addr, req) = obj.read() .expect("'obj' read lock in fn run_gossip") .gossip_request()?; - let sock = UdpSocket::bind("0.0.0.0:0")?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have - let r = serialize(&req)?; - trace!("sending gossip request to {}", remote_gossip_addr); - sock.send_to(&r, remote_gossip_addr)?; + let blob = to_blob(req, remote_gossip_addr, blob_recycler)?; + let mut q: VecDeque = VecDeque::new(); + q.push_back(blob); + blob_sender.send(q)?; Ok(()) } @@ -397,9 +419,14 @@ impl Crdt { } /// randomly pick a node and ask them for updates asynchronously - pub fn gossip(obj: Arc>, exit: Arc) -> JoinHandle<()> { + pub fn gossip( + obj: Arc>, + blob_recycler: BlobRecycler, + blob_sender: BlobSender, + exit: Arc, + ) -> JoinHandle<()> { spawn(move || loop { - let _ = Self::run_gossip(&obj); + let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); if exit.load(Ordering::Relaxed) { return; } @@ -413,74 +440,87 @@ impl Crdt { } fn run_window_request( window: &Arc>>>, - sock: &UdpSocket, from: &ReplicatedData, ix: u64, - ) -> Result<()> { + blob_recycler: &BlobRecycler, + ) -> Option { let pos = (ix as usize) % window.read().unwrap().len(); - let mut outblob = vec![]; if let &Some(ref blob) = &window.read().unwrap()[pos] { let rblob = blob.read().unwrap(); let blob_ix = rblob.get_index().expect("run_window_request get_index"); if blob_ix == ix { + let out = blob_recycler.allocate(); // copy to avoid doing IO inside the lock - outblob.extend(&rblob.data[..rblob.meta.size]); + { + let mut outblob = out.write().unwrap(); + let sz = rblob.meta.size; + outblob.meta.size = sz; + outblob.data[..sz].copy_from_slice(&rblob.data[..sz]); + outblob.meta.set_addr(&from.replicate_addr); + //TODO, set the sender id to the requester so we dont retransmit + //come up with a cleaner solution for this when sender signatures are checked + outblob.set_id(from.id).expect("blob set_id"); + } + return Some(out); } } else { assert!(window.read().unwrap()[pos].is_none()); info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr); } - if outblob.len() > 0 { - info!( - "responding RequestWindowIndex {} {}", - ix, from.replicate_addr - ); - assert!(outblob.len() < BLOB_SIZE); - sock.send_to(&outblob, from.replicate_addr)?; - } - Ok(()) + None } - /// Process messages from the network - fn run_listen( + + //TODO we should first coalesce all the requests + fn handle_blob( obj: &Arc>, window: &Arc>>>, - sock: &UdpSocket, - ) -> Result<()> { - //TODO cache connections - let mut buf = vec![0u8; BLOB_SIZE]; - trace!("recv_from on {}", sock.local_addr().unwrap()); - let (amt, src) = sock.recv_from(&mut buf)?; - trace!("got request from {}", src); - buf.resize(amt, 0); - let r = deserialize(&buf)?; - match r { + blob_recycler: &BlobRecycler, + blob: &Blob, + ) -> Option { + match deserialize(&blob.data[..blob.meta.size]) { // TODO sigverify these - Protocol::RequestUpdates(v, reqdata) => { - trace!("RequestUpdates {} from {}", v, src); + Ok(Protocol::RequestUpdates(v, reqdata)) => { + trace!("RequestUpdates {}", v); let addr = reqdata.gossip_addr; // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = obj.read() .expect("'obj' read lock in RequestUpdates") .get_updates_since(v); trace!("get updates since response {} {}", v, data.len()); - let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; - trace!("send_to {}", addr); - //TODO verify reqdata belongs to sender - obj.write() - .expect("'obj' write lock in RequestUpdates") - .insert(&reqdata); - assert!(rsp.len() < BLOB_SIZE); - sock.send_to(&rsp, addr) - .expect("'sock.send_to' in RequestUpdates"); - trace!("send_to done!"); + let len = data.len(); + let rsp = Protocol::ReceiveUpdates(from, ups, data); + obj.write().unwrap().insert(&reqdata); + if len < 1 { + let me = obj.read().unwrap(); + trace!( + "no updates me {:?} ix {} since {}", + &me.me[..4], + me.update_index, + v + ); + None + } else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) { + trace!( + "sending updates me {:?} len {} to {:?} {}", + &obj.read().unwrap().me[..4], + len, + &reqdata.id[..4], + addr, + ); + Some(r) + } else { + warn!("to_blob failed"); + None + } } - Protocol::ReceiveUpdates(from, ups, data) => { - trace!("ReceivedUpdates {} from {}", ups, src); + Ok(Protocol::ReceiveUpdates(from, ups, data)) => { + trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len()); obj.write() .expect("'obj' write lock in ReceiveUpdates") .apply_updates(from, ups, &data); + None } - Protocol::RequestWindowIndex(from, ix) => { + Ok(Protocol::RequestWindowIndex(from, ix)) => { //TODO verify from is signed obj.write().unwrap().insert(&from); let me = obj.read().unwrap().my_data().clone(); @@ -491,21 +531,54 @@ impl Crdt { me.replicate_addr ); assert_ne!(from.replicate_addr, me.replicate_addr); - let _ = Self::run_window_request(window, sock, &from, ix); + Self::run_window_request(&window, &from, ix, blob_recycler) } + Err(_) => { + warn!("deserialize crdt packet failed"); + None + } + } + } + + /// Process messages from the network + fn run_listen( + obj: &Arc>, + window: &Arc>>>, + blob_recycler: &BlobRecycler, + requests_receiver: &BlobReceiver, + response_sender: &BlobSender, + ) -> Result<()> { + //TODO cache connections + let timeout = Duration::new(1, 0); + let mut reqs = requests_receiver.recv_timeout(timeout)?; + while let Ok(mut more) = requests_receiver.try_recv() { + reqs.append(&mut more); + } + let resp: VecDeque<_> = reqs.iter() + .filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap())) + .collect(); + response_sender.send(resp)?; + while let Some(r) = reqs.pop_front() { + blob_recycler.recycle(r); } Ok(()) } pub fn listen( obj: Arc>, window: Arc>>>, - sock: UdpSocket, + blob_recycler: BlobRecycler, + requests_receiver: BlobReceiver, + response_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { - sock.set_read_timeout(Some(Duration::new(2, 0))) - .expect("'sock.set_read_timeout' in crdt.rs"); spawn(move || loop { - let e = Self::run_listen(&obj, &window, &sock); + let e = Self::run_listen( + &obj, + &window, + &blob_recycler, + &requests_receiver, + &response_sender, + ); if e.is_err() { info!( "run_listen timeout, table size: {}", @@ -519,134 +592,57 @@ impl Crdt { } } -#[cfg(test)] -mod tests { - use crdt::{Crdt, ReplicatedData}; - use logger; - use packet::Blob; - use rayon::iter::*; - use signature::KeyPair; - use signature::KeyPairUtil; - use std::net::UdpSocket; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::{Arc, RwLock}; - use std::thread::{sleep, JoinHandle}; - use std::time::Duration; +pub struct Sockets { + pub gossip: UdpSocket, + pub gossip_send: UdpSocket, + pub requests: UdpSocket, + pub replicate: UdpSocket, + pub transaction: UdpSocket, + pub respond: UdpSocket, + pub broadcast: UdpSocket, +} - fn test_node() -> (Crdt, UdpSocket, UdpSocket, UdpSocket) { +pub struct TestNode { + pub data: ReplicatedData, + pub sockets: Sockets, +} + +impl TestNode { + pub fn new() -> TestNode { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transaction = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - let transactions = UdpSocket::bind("0.0.0.0:0").unwrap(); + let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new( + let data = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - transactions.local_addr().unwrap(), + requests.local_addr().unwrap(), + transaction.local_addr().unwrap(), ); - let crdt = Crdt::new(d); - trace!( - "id: {} gossip: {} replicate: {} serve: {}", - crdt.my_data().id[0], - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - ); - (crdt, gossip, replicate, serve) + TestNode { + data: data, + sockets: Sockets { + gossip, + gossip_send, + requests, + replicate, + transaction, + respond, + broadcast, + }, + } } +} - /// Test that the network converges. - /// Run until every node in the network has a full ReplicatedData set. - /// Check that nodes stop sending updates after all the ReplicatedData has been shared. - /// tests that actually use this function are below - fn run_gossip_topo(topo: F) - where - F: Fn(&Vec<(Arc>, JoinHandle<()>)>) -> (), - { - let num: usize = 5; - let exit = Arc::new(AtomicBool::new(false)); - let listen: Vec<_> = (0..num) - .map(|_| { - let (crdt, gossip, _, _) = test_node(); - let c = Arc::new(RwLock::new(crdt)); - let w = Arc::new(RwLock::new(vec![])); - let l = Crdt::listen(c.clone(), w, gossip, exit.clone()); - (c, l) - }) - .collect(); - topo(&listen); - let gossip: Vec<_> = listen - .iter() - .map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone())) - .collect(); - let mut done = true; - for i in 0..(num * 32) { - done = false; - trace!("round {}", i); - for &(ref c, _) in listen.iter() { - if num == c.read().unwrap().convergence() as usize { - done = true; - break; - } - } - //at least 1 node converged - if done == true { - break; - } - sleep(Duration::new(1, 0)); - } - exit.store(true, Ordering::Relaxed); - for j in gossip { - j.join().unwrap(); - } - for (c, j) in listen.into_iter() { - j.join().unwrap(); - // make it clear what failed - // protocol is to chatty, updates should stop after everyone receives `num` - assert!(c.read().unwrap().update_index <= num as u64); - // protocol is not chatty enough, everyone should get `num` entries - assert_eq!(c.read().unwrap().table.len(), num); - } - assert!(done); - } - /// ring a -> b -> c -> d -> e -> a - #[test] - #[ignore] - fn gossip_ring_test() { - logger::setup(); - run_gossip_topo(|listen| { - let num = listen.len(); - for n in 0..num { - let y = n % listen.len(); - let x = (n + 1) % listen.len(); - let mut xv = listen[x].0.write().unwrap(); - let yv = listen[y].0.read().unwrap(); - let mut d = yv.table[&yv.me].clone(); - d.version = 0; - xv.insert(&d); - } - }); - } - - /// star (b,c,d,e) -> a - #[test] - #[ignore] - fn gossip_star_test() { - run_gossip_topo(|listen| { - let num = listen.len(); - for n in 0..(num - 1) { - let x = 0; - let y = (n + 1) % listen.len(); - let mut xv = listen[x].0.write().unwrap(); - let yv = listen[y].0.read().unwrap(); - let mut d = yv.table[&yv.me].clone(); - d.version = 0; - xv.insert(&d); - } - }); - } +#[cfg(test)] +mod tests { + use crdt::{Crdt, ReplicatedData}; + use signature::{KeyPair, KeyPairUtil}; /// Test that insert drops messages that are older #[test] @@ -668,77 +664,59 @@ mod tests { crdt.insert(&d); assert_eq!(crdt.table[&d.id].version, 2); } - - #[test] - #[ignore] - pub fn test_crdt_retransmit() { - logger::setup(); - trace!("c1:"); - let (mut c1, s1, r1, e1) = test_node(); - trace!("c2:"); - let (mut c2, s2, r2, _) = test_node(); - trace!("c3:"); - let (mut c3, s3, r3, _) = test_node(); - let c1_id = c1.my_data().id; - c1.set_leader(c1_id); - - c2.insert(&c1.my_data()); - c3.insert(&c1.my_data()); - - c2.set_leader(c1.my_data().id); - c3.set_leader(c1.my_data().id); - - let exit = Arc::new(AtomicBool::new(false)); - - // Create listen threads - let win1 = Arc::new(RwLock::new(vec![])); - let a1 = Arc::new(RwLock::new(c1)); - let t1 = Crdt::listen(a1.clone(), win1, s1, exit.clone()); - - let a2 = Arc::new(RwLock::new(c2)); - let win2 = Arc::new(RwLock::new(vec![])); - let t2 = Crdt::listen(a2.clone(), win2, s2, exit.clone()); - - let a3 = Arc::new(RwLock::new(c3)); - let win3 = Arc::new(RwLock::new(vec![])); - let t3 = Crdt::listen(a3.clone(), win3, s3, exit.clone()); - - // Create gossip threads - let t1_gossip = Crdt::gossip(a1.clone(), exit.clone()); - let t2_gossip = Crdt::gossip(a2.clone(), exit.clone()); - let t3_gossip = Crdt::gossip(a3.clone(), exit.clone()); - - //wait to converge - trace!("waitng to converge:"); - let mut done = false; - for _ in 0..30 { - done = a1.read().unwrap().table.len() == 3 && a2.read().unwrap().table.len() == 3 - && a3.read().unwrap().table.len() == 3; - if done { - break; - } - sleep(Duration::new(1, 0)); - } - assert!(done); - let mut b = Blob::default(); - b.meta.size = 10; - Crdt::retransmit(&a1, &Arc::new(RwLock::new(b)), &e1).unwrap(); - let res: Vec<_> = [r1, r2, r3] - .into_par_iter() - .map(|s| { - let mut b = Blob::default(); - s.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - let res = s.recv_from(&mut b.data); - res.is_err() //true if failed to receive the retransmit packet - }) - .collect(); - //true if failed receive the retransmit packet, r2, and r3 should succeed - //r1 was the sender, so it should fail to receive the packet - assert_eq!(res, [true, false, false]); - exit.store(true, Ordering::Relaxed); - let threads = vec![t1, t2, t3, t1_gossip, t2_gossip, t3_gossip]; - for t in threads.into_iter() { - t.join().unwrap(); - } + fn sorted(ls: &Vec) -> Vec { + let mut copy: Vec<_> = ls.iter().map(|x| x.clone()).collect(); + copy.sort_by(|x, y| x.id.cmp(&y.id)); + copy } + #[test] + fn update_test() { + let d1 = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + ); + let d2 = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + ); + let d3 = ReplicatedData::new( + KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + "127.0.0.1:1237".parse().unwrap(), + ); + let mut crdt = Crdt::new(d1.clone()); + let (key, ix, ups) = crdt.get_updates_since(0); + assert_eq!(key, d1.id); + assert_eq!(ix, 1); + assert_eq!(ups.len(), 1); + assert_eq!(sorted(&ups), sorted(&vec![d1.clone()])); + crdt.insert(&d2); + let (key, ix, ups) = crdt.get_updates_since(0); + assert_eq!(key, d1.id); + assert_eq!(ix, 2); + assert_eq!(ups.len(), 2); + assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()])); + crdt.insert(&d3); + let (key, ix, ups) = crdt.get_updates_since(0); + assert_eq!(key, d1.id); + assert_eq!(ix, 3); + assert_eq!(ups.len(), 3); + assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); + let mut crdt2 = Crdt::new(d2.clone()); + crdt2.apply_updates(key, ix, &ups); + assert_eq!(crdt2.table.values().len(), 3); + assert_eq!( + sorted(&crdt2.table.values().map(|x| x.clone()).collect()), + sorted(&crdt.table.values().map(|x| x.clone()).collect()) + ); + } + } diff --git a/src/data_replicator.rs b/src/data_replicator.rs new file mode 100644 index 0000000000..652fe403ee --- /dev/null +++ b/src/data_replicator.rs @@ -0,0 +1,238 @@ +use crdt; +use packet; +use result::Result; +use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::JoinHandle; +use streamer; + +pub struct DataReplicator { + pub thread_hdls: Vec>, +} + +impl DataReplicator { + pub fn new( + crdt: Arc>, + window: Arc>>>, + gossip_listen_socket: UdpSocket, + gossip_send_socket: UdpSocket, + exit: Arc, + ) -> Result { + let blob_recycler = packet::BlobRecycler::default(); + let (request_sender, request_receiver) = channel(); + trace!( + "DataReplicator: id: {:?}, listening on: {:?}", + &crdt.read().unwrap().me[..4], + gossip_listen_socket.local_addr().unwrap() + ); + let t_receiver = streamer::blob_receiver( + exit.clone(), + blob_recycler.clone(), + gossip_listen_socket, + request_sender, + )?; + let (response_sender, response_receiver) = channel(); + let t_responder = streamer::responder( + gossip_send_socket, + exit.clone(), + blob_recycler.clone(), + response_receiver, + ); + let t_listen = crdt::Crdt::listen( + crdt.clone(), + window, + blob_recycler.clone(), + request_receiver, + response_sender.clone(), + exit.clone(), + ); + let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit); + let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; + Ok(DataReplicator { thread_hdls }) + } +} + +#[cfg(test)] +mod tests { + use crdt::{Crdt, TestNode}; + use data_replicator::DataReplicator; + use logger; + use packet::Blob; + use rayon::iter::*; + use std::net::UdpSocket; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, RwLock}; + use std::thread::sleep; + use std::time::Duration; + + fn test_node(exit: Arc) -> (Arc>, DataReplicator, UdpSocket) { + let tn = TestNode::new(); + let crdt = Crdt::new(tn.data.clone()); + let c = Arc::new(RwLock::new(crdt)); + let w = Arc::new(RwLock::new(vec![])); + let d = DataReplicator::new( + c.clone(), + w, + tn.sockets.gossip, + tn.sockets.gossip_send, + exit, + ).unwrap(); + (c, d, tn.sockets.replicate) + } + + /// Test that the network converges. + /// Run until every node in the network has a full ReplicatedData set. + /// Check that nodes stop sending updates after all the ReplicatedData has been shared. + /// tests that actually use this function are below + fn run_gossip_topo(topo: F) + where + F: Fn(&Vec<(Arc>, DataReplicator, UdpSocket)>) -> (), + { + let num: usize = 5; + let exit = Arc::new(AtomicBool::new(false)); + let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect(); + topo(&listen); + let mut done = true; + for i in 0..(num * 32) { + done = false; + trace!("round {}", i); + for &(ref c, _, _) in listen.iter() { + if num == c.read().unwrap().convergence() as usize { + done = true; + break; + } + } + //at least 1 node converged + if done == true { + break; + } + sleep(Duration::new(1, 0)); + } + exit.store(true, Ordering::Relaxed); + for (c, dr, _) in listen.into_iter() { + for j in dr.thread_hdls.into_iter() { + j.join().unwrap(); + } + // make it clear what failed + // protocol is to chatty, updates should stop after everyone receives `num` + assert!(c.read().unwrap().update_index <= num as u64); + // protocol is not chatty enough, everyone should get `num` entries + assert_eq!(c.read().unwrap().table.len(), num); + } + assert!(done); + } + /// ring a -> b -> c -> d -> e -> a + #[test] + fn gossip_integration_ring_test() { + logger::setup(); + run_gossip_topo(|listen| { + let num = listen.len(); + for n in 0..num { + let y = n % listen.len(); + let x = (n + 1) % listen.len(); + let mut xv = listen[x].0.write().unwrap(); + let yv = listen[y].0.read().unwrap(); + let mut d = yv.table[&yv.me].clone(); + d.version = 0; + xv.insert(&d); + } + }); + } + + /// star a -> (b,c,d,e) + #[test] + fn gossip_integration_star_test() { + logger::setup(); + run_gossip_topo(|listen| { + let num = listen.len(); + for n in 0..(num - 1) { + let x = 0; + let y = (n + 1) % listen.len(); + let mut xv = listen[x].0.write().unwrap(); + let yv = listen[y].0.read().unwrap(); + let mut yd = yv.table[&yv.me].clone(); + yd.version = 0; + xv.insert(&yd); + trace!("star leader {:?}", &xv.me[..4]); + } + }); + } + + /// rstar a <- (b,c,d,e) + #[test] + fn gossip_integration_rstar_test() { + logger::setup(); + run_gossip_topo(|listen| { + let num = listen.len(); + let xd = { + let xv = listen[0].0.read().unwrap(); + xv.table[&xv.me].clone() + }; + trace!("rstar leader {:?}", &xd.id[..4]); + for n in 0..(num - 1) { + let y = (n + 1) % listen.len(); + let mut yv = listen[y].0.write().unwrap(); + yv.insert(&xd); + trace!("rstar insert {:?} into {:?}", &xd.id[..4], &yv.me[..4]); + } + }); + } + + #[test] + pub fn test_crdt_retransmit() { + logger::setup(); + let exit = Arc::new(AtomicBool::new(false)); + trace!("c1:"); + let (c1, dr1, tn1) = test_node(exit.clone()); + trace!("c2:"); + let (c2, dr2, tn2) = test_node(exit.clone()); + trace!("c3:"); + let (c3, dr3, tn3) = test_node(exit.clone()); + let c1_data = c1.read().unwrap().my_data().clone(); + c1.write().unwrap().set_leader(c1_data.id); + + c2.write().unwrap().insert(&c1_data); + c3.write().unwrap().insert(&c1_data); + + c2.write().unwrap().set_leader(c1_data.id); + c3.write().unwrap().set_leader(c1_data.id); + + //wait to converge + trace!("waiting to converge:"); + let mut done = false; + for _ in 0..30 { + done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3 + && c3.read().unwrap().table.len() == 3; + if done { + break; + } + sleep(Duration::new(1, 0)); + } + assert!(done); + let mut b = Blob::default(); + b.meta.size = 10; + Crdt::retransmit(&c1, &Arc::new(RwLock::new(b)), &tn1).unwrap(); + let res: Vec<_> = [tn1, tn2, tn3] + .into_par_iter() + .map(|s| { + let mut b = Blob::default(); + s.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let res = s.recv_from(&mut b.data); + res.is_err() //true if failed to receive the retransmit packet + }) + .collect(); + //true if failed receive the retransmit packet, r2, and r3 should succeed + //r1 was the sender, so it should fail to receive the packet + assert_eq!(res, [true, false, false]); + exit.store(true, Ordering::Relaxed); + let mut threads = vec![]; + threads.extend(dr1.thread_hdls.into_iter()); + threads.extend(dr2.thread_hdls.into_iter()); + threads.extend(dr3.thread_hdls.into_iter()); + for t in threads.into_iter() { + t.join().unwrap(); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index cb6e975f61..dcd26f3850 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod bank; pub mod banking_stage; pub mod budget; pub mod crdt; +pub mod data_replicator; pub mod entry; pub mod entry_writer; #[cfg(feature = "erasure")] diff --git a/src/packet.rs b/src/packet.rs index 5fdcab1acb..4441565ef4 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -180,10 +180,10 @@ impl Packets { socket.set_nonblocking(false)?; for p in &mut self.packets { p.meta.size = 0; - trace!("receiving"); + trace!("receiving on {}", socket.local_addr().unwrap()); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { - debug!("got {:?} messages", i); + debug!("got {:?} messages on {}", i, socket.local_addr().unwrap()); break; } Err(e) => { @@ -250,6 +250,7 @@ pub fn to_blob( // the raw bytes are being serialized and sent, this isn't the // right interface, and we should create a separate path for // sending request responses in the RPU + assert!(len < BLOB_SIZE); b.data[..len].copy_from_slice(&v); b.meta.size = len; b.meta.set_addr(&rsp_addr); @@ -283,7 +284,8 @@ impl Blob { self.data[..BLOB_INDEX_END].clone_from_slice(&wtr); Ok(()) } - + /// sender id, we use this for identifying if its a blob from the leader that we should + /// retransmit. eventually blobs should have a signature that we can use ffor spam filtering pub fn get_id(&self) -> Result { let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?; Ok(e) @@ -317,9 +319,10 @@ impl Blob { let r = re.allocate(); { let mut p = r.write().expect("'r' write lock in pub fn recv_from"); + trace!("receiving on {}", socket.local_addr().unwrap()); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { - trace!("got {:?} messages", i); + trace!("got {:?} messages on {}", i, socket.local_addr().unwrap()); break; } Err(e) => { diff --git a/src/server.rs b/src/server.rs index 5c28a6282f..1a85320cbd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,6 +2,7 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; +use data_replicator::DataReplicator; use hash::Hash; use packet; use rpu::Rpu; @@ -51,19 +52,26 @@ impl Server { thread_hdls.extend(tpu.thread_hdls); let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip_socket, exit.clone()); + let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + let data_replicator = DataReplicator::new( + crdt.clone(), + window.clone(), + gossip_socket, + gossip_send_socket, + exit.clone(), + ).expect("DataReplicator::new"); + thread_hdls.extend(data_replicator.thread_hdls); let t_broadcast = streamer::broadcaster( broadcast_socket, exit.clone(), - crdt.clone(), + crdt, window, blob_recycler.clone(), tpu.blob_receiver, ); - thread_hdls.extend(vec![t_gossip, t_listen, t_broadcast]); + thread_hdls.extend(vec![t_broadcast]); Server { thread_hdls } } diff --git a/src/streamer.rs b/src/streamer.rs index ef5b399e63..d069eef479 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -106,6 +106,7 @@ pub fn responder( //TODO, we would need to stick block authentication before we create the //window. fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> { + trace!("receiving on {}", sock.local_addr().unwrap()); let dq = Blob::recv_from(recycler, sock)?; if !dq.is_empty() { s.send(dq)?; @@ -584,7 +585,6 @@ mod bench { #[cfg(test)] mod test { use crdt::{Crdt, ReplicatedData}; - use logger; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; use signature::KeyPair; use signature::KeyPairUtil; @@ -595,10 +595,9 @@ mod test { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; - use std::thread::sleep; use std::time::Duration; - use streamer::{blob_receiver, receiver, responder, retransmitter, window}; use streamer::{default_window, BlobReceiver, PacketReceiver}; + use streamer::{blob_receiver, receiver, responder, window}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { @@ -735,111 +734,4 @@ mod test { t_responder.join().expect("join"); t_window.join().expect("join"); } - - fn test_node() -> (Arc>, UdpSocket, UdpSocket, UdpSocket) { - let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); - let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); - let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); - let transaction = UdpSocket::bind("127.0.0.1:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), - transaction.local_addr().unwrap(), - ); - trace!("data: {:?}", d); - let crdt = Crdt::new(d); - (Arc::new(RwLock::new(crdt)), gossip, replicate, serve) - } - - #[test] - #[ignore] - //retransmit from leader to replicate target - pub fn retransmit() { - logger::setup(); - trace!("retransmit test start"); - let exit = Arc::new(AtomicBool::new(false)); - let (crdt_leader, sock_gossip_leader, _, sock_leader) = test_node(); - let (crdt_target, sock_gossip_target, sock_replicate_target, _) = test_node(); - let leader_data = crdt_leader.read().unwrap().my_data().clone(); - crdt_leader.write().unwrap().insert(&leader_data); - crdt_leader.write().unwrap().set_leader(leader_data.id); - let t_crdt_leader_g = Crdt::gossip(crdt_leader.clone(), exit.clone()); - let window_leader = Arc::new(RwLock::new(vec![])); - let t_crdt_leader_l = Crdt::listen( - crdt_leader.clone(), - window_leader, - sock_gossip_leader, - exit.clone(), - ); - - crdt_target.write().unwrap().insert(&leader_data); - crdt_target.write().unwrap().set_leader(leader_data.id); - let t_crdt_target_g = Crdt::gossip(crdt_target.clone(), exit.clone()); - let window_target = Arc::new(RwLock::new(vec![])); - let t_crdt_target_l = Crdt::listen( - crdt_target.clone(), - window_target, - sock_gossip_target, - exit.clone(), - ); - //leader retransmitter - let (s_retransmit, r_retransmit) = channel(); - let blob_recycler = BlobRecycler::default(); - let saddr = sock_leader.local_addr().unwrap(); - let t_retransmit = retransmitter( - sock_leader, - exit.clone(), - crdt_leader.clone(), - blob_recycler.clone(), - r_retransmit, - ); - - //target receiver - let (s_blob_receiver, r_blob_receiver) = channel(); - let t_receiver = blob_receiver( - exit.clone(), - blob_recycler.clone(), - sock_replicate_target, - s_blob_receiver, - ).unwrap(); - for _ in 0..10 { - let done = crdt_target.read().unwrap().update_index == 2 - && crdt_leader.read().unwrap().update_index == 2; - if done { - break; - } - let timer = Duration::new(1, 0); - sleep(timer); - } - - //send the data through - let mut bq = VecDeque::new(); - let b = blob_recycler.allocate(); - b.write().unwrap().meta.size = 10; - bq.push_back(b); - s_retransmit.send(bq).unwrap(); - let timer = Duration::new(5, 0); - trace!("Waiting for timeout"); - let mut oq = r_blob_receiver.recv_timeout(timer).unwrap(); - assert_eq!(oq.len(), 1); - let o = oq.pop_front().unwrap(); - let ro = o.read().unwrap(); - assert_eq!(ro.meta.size, 10); - assert_eq!(ro.meta.addr(), saddr); - exit.store(true, Ordering::Relaxed); - let threads = vec![ - t_receiver, - t_retransmit, - t_crdt_target_g, - t_crdt_target_l, - t_crdt_leader_g, - t_crdt_leader_l, - ]; - for t in threads { - t.join().unwrap(); - } - } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 7e20682cc0..22c624ddb4 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -177,18 +177,18 @@ mod tests { use super::*; use bank::Bank; use budget::Budget; + use crdt::TestNode; use futures::Future; use logger; use mint::Mint; use server::Server; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::sleep; use std::time::Duration; use transaction::{Instruction, Plan}; - use tvu::TestNode; #[test] fn test_thin_client() { diff --git a/src/tpu.rs b/src/tpu.rs index 9737f27fdf..9e2e42c32e 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -56,7 +56,6 @@ impl Tpu { Mutex::new(writer), record_stage.entry_receiver, ); - let mut thread_hdls = vec![ fetch_stage.thread_hdl, banking_stage.thread_hdl, diff --git a/src/tvu.rs b/src/tvu.rs index 8d4ac27c86..79e8d08ff2 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -22,9 +22,9 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData}; +use data_replicator::DataReplicator; use packet; use replicate_stage::ReplicateStage; -use signature::{KeyPair, KeyPairUtil}; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -49,7 +49,7 @@ impl Tvu { pub fn new( bank: Arc, me: ReplicatedData, - gossip: UdpSocket, + gossip_listen_socket: UdpSocket, replicate: UdpSocket, leader: ReplicatedData, exit: Arc, @@ -62,9 +62,15 @@ impl Tvu { crdt.write() .expect("'crdt' write lock before insert() in pub fn replicate") .insert(&leader); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + let data_replicator = DataReplicator::new( + crdt.clone(), + window.clone(), + gossip_listen_socket, + gossip_send_socket, + exit.clone(), + ).expect("DataReplicator::new"); // TODO pull this socket out through the public interface // make sure we are on the same interface @@ -111,108 +117,52 @@ impl Tvu { blob_recycler.clone(), ); - let threads = vec![ + let mut threads = vec![ //replicate threads t_blob_receiver, t_retransmit, t_window, replicate_stage.thread_hdl, - t_gossip, - t_listen, ]; + threads.extend(data_replicator.thread_hdls.into_iter()); Tvu { thread_hdls: threads, } } } -pub struct Sockets { - pub gossip: UdpSocket, - pub requests: UdpSocket, - pub replicate: UdpSocket, - pub transaction: UdpSocket, - pub respond: UdpSocket, - pub broadcast: UdpSocket, -} - -pub struct TestNode { - pub data: ReplicatedData, - pub sockets: Sockets, -} - -impl TestNode { - pub fn new() -> TestNode { - let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); - let transaction = UdpSocket::bind("0.0.0.0:0").unwrap(); - let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); - let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); - let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let data = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - requests.local_addr().unwrap(), - transaction.local_addr().unwrap(), - ); - TestNode { - data: data, - sockets: Sockets { - gossip, - requests, - replicate, - transaction, - respond, - broadcast, - }, - } - } -} - -#[cfg(test)] -pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { - use signature::{KeyPair, KeyPairUtil}; - use std::time::Duration; - - let transactions_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); - let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); - let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); - let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - requests_socket.local_addr().unwrap(), - transactions_socket.local_addr().unwrap(), - ); - (d, gossip, replicate, requests_socket, transactions_socket) -} - #[cfg(test)] pub mod tests { use bank::Bank; use bincode::serialize; - use crdt::Crdt; + use crdt::{Crdt, TestNode}; + use data_replicator::DataReplicator; use entry::Entry; use hash::{hash, Hash}; use logger; use mint::Mint; use packet::BlobRecycler; + use result::Result; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; + use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer; use transaction::Transaction; - use tvu::{TestNode, Tvu}; + use tvu::Tvu; + fn new_replicator( + crdt: Arc>, + listen: UdpSocket, + exit: Arc, + ) -> Result { + let window = streamer::default_window(); + let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + DataReplicator::new(crdt, window, listen, send_sock, exit) + } /// Test that message sent from leader to target1 and replicated to target2 #[test] fn test_replicate() { @@ -227,9 +177,7 @@ pub mod tests { crdt_l.set_leader(leader.data.id); let cref_l = Arc::new(RwLock::new(crdt_l)); - let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); - let window1 = streamer::default_window(); - let t_l_listen = Crdt::listen(cref_l, window1, leader.sockets.gossip, exit.clone()); + let dr_l = new_replicator(cref_l, leader.sockets.gossip, exit.clone()).unwrap(); //start crdt2 let mut crdt2 = Crdt::new(target2.data.clone()); @@ -237,9 +185,7 @@ pub mod tests { crdt2.set_leader(leader.data.id); let leader_id = leader.data.id; let cref2 = Arc::new(RwLock::new(crdt2)); - let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); - let window2 = streamer::default_window(); - let t2_listen = Crdt::listen(cref2, window2, target2.sockets.gossip, exit.clone()); + let dr_2 = new_replicator(cref2, target2.sockets.gossip, exit.clone()).unwrap(); // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to @@ -337,11 +283,13 @@ pub mod tests { for t in tvu.thread_hdls { t.join().expect("join"); } - t2_gossip.join().expect("join"); - t2_listen.join().expect("join"); + for t in dr_l.thread_hdls { + t.join().expect("join"); + } + for t in dr_2.thread_hdls { + t.join().expect("join"); + } t_receiver.join().expect("join"); t_responder.join().expect("join"); - t_l_gossip.join().expect("join"); - t_l_listen.join().expect("join"); } } diff --git a/tests/multinode.rs b/tests/multinode.rs index be4c469a7b..cdadd88ba1 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -6,14 +6,15 @@ extern crate solana; use futures::Future; use solana::bank::Bank; +use solana::crdt::TestNode; use solana::crdt::{Crdt, ReplicatedData}; +use solana::data_replicator::DataReplicator; use solana::logger; use solana::mint::Mint; use solana::server::Server; use solana::signature::{KeyPair, KeyPairUtil, PublicKey}; use solana::streamer::default_window; use solana::thin_client::ThinClient; -use solana::tvu::TestNode; use std::io; use std::io::sink; use std::net::UdpSocket; @@ -59,16 +60,15 @@ fn converge( let mut spy_crdt = Crdt::new(spy.data); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); - let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let t_spy_listen = Crdt::listen( + let dr = DataReplicator::new( spy_ref.clone(), spy_window, spy.sockets.gossip, - exit.clone(), - ); - let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); + spy.sockets.gossip_send, + exit, + ).unwrap(); //wait for the network to converge let mut converged = false; for _ in 0..30 { @@ -80,8 +80,7 @@ fn converge( sleep(Duration::new(1, 0)); } assert!(converged); - threads.push(t_spy_listen); - threads.push(t_spy_gossip); + threads.extend(dr.thread_hdls.into_iter()); let v: Vec = spy_ref .read() .unwrap()