Merge pull request #152 from aeyakovenko/star
recover full network from a star
This commit is contained in:
commit
266f85f607
97
src/crdt.rs
97
src/crdt.rs
|
@ -85,7 +85,10 @@ pub struct Crdt {
|
||||||
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
|
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
enum Protocol {
|
enum Protocol {
|
||||||
RequestUpdates(u64, SocketAddr),
|
/// forward your own latest data structure when requesting an update
|
||||||
|
/// this doesn't update the `remote` update index, but it allows the
|
||||||
|
/// recepient of this request to add knowledge of this node to the network
|
||||||
|
RequestUpdates(u64, ReplicatedData),
|
||||||
//TODO might need a since?
|
//TODO might need a since?
|
||||||
/// from id, form's last update index, ReplicatedData
|
/// from id, form's last update index, ReplicatedData
|
||||||
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>),
|
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>),
|
||||||
|
@ -106,6 +109,13 @@ impl Crdt {
|
||||||
g.table.insert(me.id, me);
|
g.table.insert(me.id, me);
|
||||||
g
|
g
|
||||||
}
|
}
|
||||||
|
pub fn import(&mut self, v: &ReplicatedData) {
|
||||||
|
// TODO check that last_verified types are always increasing
|
||||||
|
// TODO probably an error or attack
|
||||||
|
if self.me != v.id {
|
||||||
|
self.insert(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn insert(&mut self, v: &ReplicatedData) {
|
pub fn insert(&mut self, v: &ReplicatedData) {
|
||||||
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
|
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
|
||||||
trace!("insert! {}", v.version);
|
trace!("insert! {}", v.version);
|
||||||
|
@ -141,13 +151,13 @@ impl Crdt {
|
||||||
/// * A - Remote gossip address
|
/// * A - Remote gossip address
|
||||||
/// * B - My gossip address
|
/// * B - My gossip address
|
||||||
/// * C - Remote update index to request updates since
|
/// * C - Remote update index to request updates since
|
||||||
fn gossip_request(&self) -> (SocketAddr, SocketAddr, u64) {
|
fn gossip_request(&self) -> (SocketAddr, Protocol) {
|
||||||
let n = (Self::random() as usize) % self.table.len();
|
let n = (Self::random() as usize) % self.table.len();
|
||||||
trace!("random {:?} {}", &self.me[0..1], n);
|
trace!("random {:?} {}", &self.me[0..1], n);
|
||||||
let v = self.table.values().nth(n).unwrap().clone();
|
let v = self.table.values().nth(n).unwrap().clone();
|
||||||
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
||||||
let my_addr = self.table[&self.me].gossip_addr;
|
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
||||||
(v.gossip_addr, my_addr, remote_update_index)
|
(v.gossip_addr, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// At random pick a node and try to get updated changes from them
|
/// At random pick a node and try to get updated changes from them
|
||||||
|
@ -157,14 +167,11 @@ impl Crdt {
|
||||||
|
|
||||||
// Lock the object only to do this operation and not for any longer
|
// Lock the object only to do this operation and not for any longer
|
||||||
// especially not when doing the `sock.send_to`
|
// especially not when doing the `sock.send_to`
|
||||||
let (remote_gossip_addr, my_addr, remote_update_index) =
|
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request();
|
||||||
obj.read().unwrap().gossip_request();
|
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
||||||
let mut req_addr = my_addr;
|
|
||||||
req_addr.set_port(0);
|
|
||||||
let sock = UdpSocket::bind(req_addr)?;
|
|
||||||
// TODO this will get chatty, so we need to first ask for number of updates since
|
// TODO this will get chatty, so we need to first ask for number of updates since
|
||||||
// then only ask for specific data that we dont have
|
// then only ask for specific data that we dont have
|
||||||
let r = serialize(&Protocol::RequestUpdates(remote_update_index, my_addr))?;
|
let r = serialize(&req)?;
|
||||||
sock.send_to(&r, remote_gossip_addr)?;
|
sock.send_to(&r, remote_gossip_addr)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -174,17 +181,12 @@ impl Crdt {
|
||||||
/// * `from` - identity of the sender of the updates
|
/// * `from` - identity of the sender of the updates
|
||||||
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
|
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
|
||||||
/// * `data` - the update data
|
/// * `data` - the update data
|
||||||
fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: Vec<ReplicatedData>) {
|
fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) {
|
||||||
trace!("got updates {}", data.len());
|
trace!("got updates {}", data.len());
|
||||||
// TODO we need to punish/spam resist here
|
// TODO we need to punish/spam resist here
|
||||||
// sig verify the whole update and slash anyone who sends a bad update
|
// sig verify the whole update and slash anyone who sends a bad update
|
||||||
for v in data {
|
for v in data {
|
||||||
// TODO probably an error or attack
|
self.import(&v);
|
||||||
if v.id == self.me {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// TODO check that last_verified types are always increasing
|
|
||||||
self.insert(&v);
|
|
||||||
}
|
}
|
||||||
*self.remote.entry(from).or_insert(update_index) = update_index;
|
*self.remote.entry(from).or_insert(update_index) = update_index;
|
||||||
}
|
}
|
||||||
|
@ -211,19 +213,22 @@ impl Crdt {
|
||||||
let r = deserialize(&buf)?;
|
let r = deserialize(&buf)?;
|
||||||
match r {
|
match r {
|
||||||
// TODO sigverify these
|
// TODO sigverify these
|
||||||
Protocol::RequestUpdates(v, addr) => {
|
Protocol::RequestUpdates(v, reqdata) => {
|
||||||
trace!("RequestUpdates {}", v);
|
trace!("RequestUpdates {}", v);
|
||||||
|
let addr = reqdata.gossip_addr;
|
||||||
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
|
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
|
||||||
let (from, ups, data) = obj.read().unwrap().get_updates_since(v);
|
let (from, ups, data) = obj.read().unwrap().get_updates_since(v);
|
||||||
trace!("get updates since response {} {}", v, data.len());
|
trace!("get updates since response {} {}", v, data.len());
|
||||||
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
|
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
|
||||||
trace!("send_to {}", addr);
|
trace!("send_to {}", addr);
|
||||||
|
//TODO verify reqdata belongs to sender
|
||||||
|
obj.write().unwrap().import(&reqdata);
|
||||||
sock.send_to(&rsp, addr).unwrap();
|
sock.send_to(&rsp, addr).unwrap();
|
||||||
trace!("send_to done!");
|
trace!("send_to done!");
|
||||||
}
|
}
|
||||||
Protocol::ReceiveUpdates(from, ups, data) => {
|
Protocol::ReceiveUpdates(from, ups, data) => {
|
||||||
trace!("ReceivedUpdates");
|
trace!("ReceivedUpdates");
|
||||||
obj.write().unwrap().apply_updates(from, ups, data);
|
obj.write().unwrap().apply_updates(from, ups, &data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -251,15 +256,17 @@ mod test {
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::sleep;
|
use std::thread::{sleep, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
/// Test that the network converges.
|
/// Test that the network converges.
|
||||||
/// Create a ring a -> b -> c -> d -> e -> a of size num.
|
|
||||||
/// Run until every node in the network has a full ReplicatedData set.
|
/// Run until every node in the network has a full ReplicatedData set.
|
||||||
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
|
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
|
||||||
#[test]
|
/// tests that actually use this function are below
|
||||||
fn gossip_test() {
|
fn run_gossip_topo<F>(topo: F)
|
||||||
|
where
|
||||||
|
F: Fn(&Vec<(Arc<RwLock<Crdt>>, JoinHandle<()>)>) -> (),
|
||||||
|
{
|
||||||
let num: usize = 5;
|
let num: usize = 5;
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let listen: Vec<_> = (0..num)
|
let listen: Vec<_> = (0..num)
|
||||||
|
@ -273,15 +280,7 @@ mod test {
|
||||||
(c, l)
|
(c, l)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
for n in 0..num {
|
topo(&listen);
|
||||||
let y = n % listen.len();
|
|
||||||
let x = (n + 1) % listen.len();
|
|
||||||
let mut xv = listen[x].0.write().unwrap();
|
|
||||||
let yv = listen[y].0.read().unwrap();
|
|
||||||
let mut d = yv.table[&yv.me].clone();
|
|
||||||
d.version = 0;
|
|
||||||
xv.insert(&d);
|
|
||||||
}
|
|
||||||
let gossip: Vec<_> = listen
|
let gossip: Vec<_> = listen
|
||||||
.iter()
|
.iter()
|
||||||
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
|
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
|
||||||
|
@ -321,6 +320,40 @@ mod test {
|
||||||
}
|
}
|
||||||
assert!(done);
|
assert!(done);
|
||||||
}
|
}
|
||||||
|
/// ring a -> b -> c -> d -> e -> a
|
||||||
|
#[test]
|
||||||
|
fn gossip_ring_test() {
|
||||||
|
run_gossip_topo(|listen| {
|
||||||
|
let num = listen.len();
|
||||||
|
for n in 0..num {
|
||||||
|
let y = n % listen.len();
|
||||||
|
let x = (n + 1) % listen.len();
|
||||||
|
let mut xv = listen[x].0.write().unwrap();
|
||||||
|
let yv = listen[y].0.read().unwrap();
|
||||||
|
let mut d = yv.table[&yv.me].clone();
|
||||||
|
d.version = 0;
|
||||||
|
xv.insert(&d);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// star (b,c,d,e) -> a
|
||||||
|
#[test]
|
||||||
|
fn gossip_star_test() {
|
||||||
|
run_gossip_topo(|listen| {
|
||||||
|
let num = listen.len();
|
||||||
|
for n in 0..(num - 1) {
|
||||||
|
let x = 0;
|
||||||
|
let y = (n + 1) % listen.len();
|
||||||
|
let mut xv = listen[x].0.write().unwrap();
|
||||||
|
let yv = listen[y].0.read().unwrap();
|
||||||
|
let mut d = yv.table[&yv.me].clone();
|
||||||
|
d.version = 0;
|
||||||
|
xv.insert(&d);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Test that insert drops messages that are older
|
/// Test that insert drops messages that are older
|
||||||
#[test]
|
#[test]
|
||||||
fn insert_test() {
|
fn insert_test() {
|
||||||
|
|
Loading…
Reference in New Issue