From 15584e70621fde03b1c845d1e745c19e9e3b6bd8 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 26 Apr 2018 13:48:42 -0700 Subject: [PATCH 1/3] recover full network from a star --- src/crdt.rs | 94 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 30 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 69de86828d..00ba3e39f5 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -85,7 +85,10 @@ pub struct Crdt { // TODO These messages should be signed, and go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize)] enum Protocol { - RequestUpdates(u64, SocketAddr), + /// forward your own latest data structure when requesting an update + /// this doesn't update the `remote` update index, but it allows the + /// recepient of this request to add knowledge of this node to the network + RequestUpdates(u64, ReplicatedData), //TODO might need a since? /// from id, form's last update index, ReplicatedData ReceiveUpdates(PublicKey, u64, Vec), @@ -106,6 +109,13 @@ impl Crdt { g.table.insert(me.id, me); g } + pub fn import(&mut self, v: &ReplicatedData) { + // TODO check that last_verified types are always increasing + // TODO probably an error or attack + if self.me != v.id { + self.insert(v); + } + } pub fn insert(&mut self, v: &ReplicatedData) { if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { trace!("insert! {}", v.version); @@ -141,13 +151,13 @@ impl Crdt { /// * A - Remote gossip address /// * B - My gossip address /// * C - Remote update index to request updates since - fn gossip_request(&self) -> (SocketAddr, SocketAddr, u64) { + fn gossip_request(&self) -> (SocketAddr, Protocol) { let n = (Self::random() as usize) % self.table.len(); trace!("random {:?} {}", &self.me[0..1], n); let v = self.table.values().nth(n).unwrap().clone(); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); - let my_addr = self.table[&self.me].gossip_addr; - (v.gossip_addr, my_addr, remote_update_index) + let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); + (v.gossip_addr, req) } /// At random pick a node and try to get updated changes from them @@ -157,14 +167,13 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` - let (remote_gossip_addr, my_addr, remote_update_index) = - obj.read().unwrap().gossip_request(); - let mut req_addr = my_addr; + let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request(); + let mut req_addr = remote_gossip_addr; req_addr.set_port(0); let sock = UdpSocket::bind(req_addr)?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have - let r = serialize(&Protocol::RequestUpdates(remote_update_index, my_addr))?; + let r = serialize(&req)?; sock.send_to(&r, remote_gossip_addr)?; Ok(()) } @@ -174,17 +183,12 @@ impl Crdt { /// * `from` - identity of the sender of the updates /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents /// * `data` - the update data - fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: Vec) { + fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) { trace!("got updates {}", data.len()); // TODO we need to punish/spam resist here // sig verify the whole update and slash anyone who sends a bad update for v in data { - // TODO probably an error or attack - if v.id == self.me { - continue; - } - // TODO check that last_verified types are always increasing - self.insert(&v); + self.import(&v); } *self.remote.entry(from).or_insert(update_index) = update_index; } @@ -211,19 +215,22 @@ impl Crdt { let r = deserialize(&buf)?; match r { // TODO sigverify these - Protocol::RequestUpdates(v, addr) => { + Protocol::RequestUpdates(v, reqdata) => { trace!("RequestUpdates {}", v); + let addr = reqdata.gossip_addr; // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = obj.read().unwrap().get_updates_since(v); trace!("get updates since response {} {}", v, data.len()); let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; trace!("send_to {}", addr); + //TODO verify reqdata belongs to sender + obj.write().unwrap().import(&reqdata); sock.send_to(&rsp, addr).unwrap(); trace!("send_to done!"); } Protocol::ReceiveUpdates(from, ups, data) => { trace!("ReceivedUpdates"); - obj.write().unwrap().apply_updates(from, ups, data); + obj.write().unwrap().apply_updates(from, ups, &data); } } Ok(()) @@ -251,15 +258,16 @@ mod test { use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; - use std::thread::sleep; + use std::thread::{sleep, JoinHandle}; use std::time::Duration; /// Test that the network converges. - /// Create a ring a -> b -> c -> d -> e -> a of size num. /// Run until every node in the network has a full ReplicatedData set. /// Check that nodes stop sending updates after all the ReplicatedData has been shared. - #[test] - fn gossip_test() { + fn run_gossip_topo(topo: F) + where + F: Fn(&Vec<(Arc>, JoinHandle<()>)>) -> (), + { let num: usize = 5; let exit = Arc::new(AtomicBool::new(false)); let listen: Vec<_> = (0..num) @@ -273,15 +281,7 @@ mod test { (c, l) }) .collect(); - for n in 0..num { - let y = n % listen.len(); - let x = (n + 1) % listen.len(); - let mut xv = listen[x].0.write().unwrap(); - let yv = listen[y].0.read().unwrap(); - let mut d = yv.table[&yv.me].clone(); - d.version = 0; - xv.insert(&d); - } + topo(&listen); let gossip: Vec<_> = listen .iter() .map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone())) @@ -321,6 +321,40 @@ mod test { } assert!(done); } + /// ring a -> b -> c -> d -> e -> a + #[test] + fn gossip_ring_test() { + run_gossip_topo(|listen| { + let num = listen.len(); + for n in 0..num { + let y = n % listen.len(); + let x = (n + 1) % listen.len(); + let mut xv = listen[x].0.write().unwrap(); + let yv = listen[y].0.read().unwrap(); + let mut d = yv.table[&yv.me].clone(); + d.version = 0; + xv.insert(&d); + } + }); + } + + /// star (b,c,d,e) -> a + #[test] + fn gossip_star_test() { + run_gossip_topo(|listen| { + let num = listen.len(); + for n in 0..(num - 1) { + let x = 0; + let y = (n + 1) % listen.len(); + let mut xv = listen[x].0.write().unwrap(); + let yv = listen[y].0.read().unwrap(); + let mut d = yv.table[&yv.me].clone(); + d.version = 0; + xv.insert(&d); + } + }); + } + /// Test that insert drops messages that are older #[test] fn insert_test() { From 48018b3f5b88a41afe0222d13794e0b7b79c50e4 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 26 Apr 2018 13:50:57 -0700 Subject: [PATCH 2/3] docs --- src/crdt.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/crdt.rs b/src/crdt.rs index 00ba3e39f5..0b33e387a8 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -264,6 +264,7 @@ mod test { /// Test that the network converges. /// Run until every node in the network has a full ReplicatedData set. /// Check that nodes stop sending updates after all the ReplicatedData has been shared. + /// tests that actually use this function are below fn run_gossip_topo(topo: F) where F: Fn(&Vec<(Arc>, JoinHandle<()>)>) -> (), From d90ab901453a8e07bae5dde5a9f7963dbc23f71a Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 26 Apr 2018 13:54:29 -0700 Subject: [PATCH 3/3] bind to all --- src/crdt.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 0b33e387a8..b7742d5cd3 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -168,9 +168,7 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request(); - let mut req_addr = remote_gossip_addr; - req_addr.set_port(0); - let sock = UdpSocket::bind(req_addr)?; + let sock = UdpSocket::bind("0.0.0.0:0")?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have let r = serialize(&req)?;