2018-04-21 11:02:49 -07:00
|
|
|
//! The `crdt` module defines a data structure that is shared by all the nodes in the network over
|
2018-04-28 00:31:20 -07:00
|
|
|
//! a gossip control plane. The goal is to share small bits of off-chain information and detect and
|
2018-04-21 11:02:49 -07:00
|
|
|
//! repair partitions.
|
|
|
|
//!
|
|
|
|
//! This CRDT only supports a very limited set of types. A map of PublicKey -> Versioned Struct.
|
|
|
|
//! The last version is always picked durring an update.
|
2018-04-28 00:31:20 -07:00
|
|
|
//!
|
|
|
|
//! The network is arranged in layers:
|
|
|
|
//!
|
|
|
|
//! * layer 0 - Leader.
|
|
|
|
//! * layer 1 - As many nodes as we can fit
|
|
|
|
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
|
|
|
|
//!
|
|
|
|
//! Accountant needs to provide an interface for us to query the stake weight
|
2018-04-21 11:02:49 -07:00
|
|
|
|
|
|
|
use bincode::{deserialize, serialize};
|
|
|
|
use byteorder::{LittleEndian, ReadBytesExt};
|
|
|
|
use hash::Hash;
|
2018-04-28 00:31:20 -07:00
|
|
|
use packet::SharedBlob;
|
|
|
|
use rayon::prelude::*;
|
|
|
|
use result::{Error, Result};
|
2018-04-21 11:02:49 -07:00
|
|
|
use ring::rand::{SecureRandom, SystemRandom};
|
|
|
|
use signature::{PublicKey, Signature};
|
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::io::Cursor;
|
|
|
|
use std::net::{SocketAddr, UdpSocket};
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
use std::thread::{sleep, spawn, JoinHandle};
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
/// Structure to be replicated by the network
|
|
|
|
#[derive(Serialize, Deserialize, Clone)]
|
|
|
|
pub struct ReplicatedData {
|
2018-04-28 00:31:20 -07:00
|
|
|
pub id: PublicKey,
|
2018-04-21 11:02:49 -07:00
|
|
|
sig: Signature,
|
|
|
|
/// should always be increasing
|
|
|
|
version: u64,
|
|
|
|
/// address to connect to for gossip
|
2018-04-28 00:31:20 -07:00
|
|
|
pub gossip_addr: SocketAddr,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// address to connect to for replication
|
2018-04-28 00:31:20 -07:00
|
|
|
pub replicate_addr: SocketAddr,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// address to connect to when this node is leader
|
2018-04-28 00:31:20 -07:00
|
|
|
pub serve_addr: SocketAddr,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// current leader identity
|
|
|
|
current_leader_id: PublicKey,
|
|
|
|
/// last verified hash that was submitted to the leader
|
|
|
|
last_verified_hash: Hash,
|
|
|
|
/// last verified count, always increasing
|
|
|
|
last_verified_count: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ReplicatedData {
|
2018-04-28 00:31:20 -07:00
|
|
|
pub fn new(
|
|
|
|
id: PublicKey,
|
|
|
|
gossip_addr: SocketAddr,
|
|
|
|
replicate_addr: SocketAddr,
|
|
|
|
serve_addr: SocketAddr,
|
|
|
|
) -> ReplicatedData {
|
2018-04-21 11:02:49 -07:00
|
|
|
ReplicatedData {
|
|
|
|
id,
|
|
|
|
sig: Signature::default(),
|
|
|
|
version: 0,
|
|
|
|
gossip_addr,
|
2018-04-28 00:31:20 -07:00
|
|
|
replicate_addr,
|
|
|
|
serve_addr,
|
2018-04-21 11:02:49 -07:00
|
|
|
current_leader_id: PublicKey::default(),
|
|
|
|
last_verified_hash: Hash::default(),
|
|
|
|
last_verified_count: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// `Crdt` structure keeps a table of `ReplicatedData` structs
|
|
|
|
/// # Properties
|
|
|
|
/// * `table` - map of public id's to versioned and signed ReplicatedData structs
|
|
|
|
/// * `local` - map of public id's to what `self.update_index` `self.table` was updated
|
|
|
|
/// * `remote` - map of public id's to the `remote.update_index` was sent
|
|
|
|
/// * `update_index` - my update index
|
|
|
|
/// # Remarks
|
|
|
|
/// This implements two services, `gossip` and `listen`.
|
|
|
|
/// * `gossip` - asynchronously ask nodes to send updates
|
|
|
|
/// * `listen` - listen for requests and responses
|
|
|
|
/// No attempt to keep track of timeouts or dropped requests is made, or should be.
|
|
|
|
pub struct Crdt {
|
|
|
|
table: HashMap<PublicKey, ReplicatedData>,
|
|
|
|
/// Value of my update index when entry in table was updated.
|
|
|
|
/// Nodes will ask for updates since `update_index`, and this node
|
|
|
|
/// should respond with all the identities that are greater then the
|
|
|
|
/// request's `update_index` in this list
|
|
|
|
local: HashMap<PublicKey, u64>,
|
|
|
|
/// The value of the remote update index that i have last seen
|
|
|
|
/// This Node will ask external nodes for updates since the value in this list
|
2018-05-04 11:11:39 -07:00
|
|
|
pub remote: HashMap<PublicKey, u64>,
|
2018-04-28 00:31:20 -07:00
|
|
|
pub update_index: u64,
|
2018-04-21 11:02:49 -07:00
|
|
|
me: PublicKey,
|
|
|
|
timeout: Duration,
|
|
|
|
}
|
|
|
|
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
|
enum Protocol {
|
2018-04-26 13:48:42 -07:00
|
|
|
/// forward your own latest data structure when requesting an update
|
|
|
|
/// this doesn't update the `remote` update index, but it allows the
|
|
|
|
/// recepient of this request to add knowledge of this node to the network
|
|
|
|
RequestUpdates(u64, ReplicatedData),
|
2018-04-21 11:02:49 -07:00
|
|
|
//TODO might need a since?
|
|
|
|
/// from id, form's last update index, ReplicatedData
|
|
|
|
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Crdt {
|
|
|
|
pub fn new(me: ReplicatedData) -> Crdt {
|
|
|
|
assert_eq!(me.version, 0);
|
|
|
|
let mut g = Crdt {
|
|
|
|
table: HashMap::new(),
|
|
|
|
local: HashMap::new(),
|
|
|
|
remote: HashMap::new(),
|
|
|
|
me: me.id,
|
|
|
|
update_index: 1,
|
|
|
|
timeout: Duration::new(0, 100_000),
|
|
|
|
};
|
|
|
|
g.local.insert(me.id, g.update_index);
|
|
|
|
g.table.insert(me.id, me);
|
|
|
|
g
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
pub fn my_data(&self) -> &ReplicatedData {
|
|
|
|
&self.table[&self.me]
|
|
|
|
}
|
|
|
|
pub fn leader_data(&self) -> &ReplicatedData {
|
|
|
|
&self.table[&self.table[&self.me].current_leader_id]
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set_leader(&mut self, key: PublicKey) -> () {
|
|
|
|
let mut me = self.my_data().clone();
|
|
|
|
me.current_leader_id = key;
|
|
|
|
me.version += 1;
|
|
|
|
self.insert(me);
|
2018-04-26 13:48:42 -07:00
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
|
|
|
|
pub fn insert(&mut self, v: ReplicatedData) {
|
|
|
|
// TODO check that last_verified types are always increasing
|
2018-04-21 11:02:49 -07:00
|
|
|
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
|
2018-04-28 00:31:20 -07:00
|
|
|
//somehow we signed a message for our own identity with a higher version that
|
|
|
|
// we have stored ourselves
|
|
|
|
trace!("me: {:?}", self.me[0]);
|
|
|
|
trace!("v.id: {:?}", v.id[0]);
|
2018-04-21 11:02:49 -07:00
|
|
|
trace!("insert! {}", v.version);
|
|
|
|
self.update_index += 1;
|
2018-04-28 00:31:20 -07:00
|
|
|
let _ = self.table.insert(v.id.clone(), v.clone());
|
2018-04-21 11:02:49 -07:00
|
|
|
let _ = self.local.insert(v.id, self.update_index);
|
|
|
|
} else {
|
2018-04-28 00:31:20 -07:00
|
|
|
trace!(
|
|
|
|
"INSERT FAILED new.version: {} me.version: {}",
|
|
|
|
v.version,
|
|
|
|
self.table[&v.id].version
|
|
|
|
);
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
|
|
|
|
/// broadcast messages from the leader to layer 1 nodes
|
|
|
|
/// # Remarks
|
|
|
|
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
|
|
|
pub fn broadcast(
|
|
|
|
obj: &Arc<RwLock<Self>>,
|
|
|
|
blobs: &Vec<SharedBlob>,
|
|
|
|
s: &UdpSocket,
|
|
|
|
transmit_index: &mut u64,
|
|
|
|
) -> Result<()> {
|
|
|
|
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
|
|
|
// copy to avoid locking durring IO
|
|
|
|
let robj = obj.read().unwrap();
|
|
|
|
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
|
|
|
|
(robj.table[&robj.me].clone(), cloned_table)
|
|
|
|
};
|
2018-05-04 11:11:39 -07:00
|
|
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
|
|
|
let items: Vec<(usize, &ReplicatedData)> = table
|
2018-04-28 00:31:20 -07:00
|
|
|
.iter()
|
2018-05-04 11:11:39 -07:00
|
|
|
.filter(|v| {
|
2018-04-28 00:31:20 -07:00
|
|
|
if me.id == v.id {
|
2018-05-04 11:11:39 -07:00
|
|
|
//filter myself
|
|
|
|
false
|
|
|
|
} else if v.replicate_addr == daddr {
|
|
|
|
//filter nodes that are not listening
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
true
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
2018-05-04 11:11:39 -07:00
|
|
|
})
|
|
|
|
.enumerate()
|
|
|
|
.collect();
|
|
|
|
let orders: Vec<_> = items.into_iter().cycle().zip(blobs.iter()).collect();
|
|
|
|
let errs: Vec<_> = orders
|
|
|
|
.into_par_iter()
|
|
|
|
.map(|((i, v), b)| {
|
2018-04-28 00:31:20 -07:00
|
|
|
// only leader should be broadcasting
|
|
|
|
assert!(me.current_leader_id != v.id);
|
|
|
|
let mut blob = b.write().unwrap();
|
2018-05-04 11:11:39 -07:00
|
|
|
blob.set_id(me.id).expect("set_id");
|
2018-04-28 00:31:20 -07:00
|
|
|
blob.set_index(*transmit_index + i as u64)
|
|
|
|
.expect("set_index");
|
2018-05-04 11:11:39 -07:00
|
|
|
//TODO profile this, may need multiple sockets for par_iter
|
2018-04-28 00:31:20 -07:00
|
|
|
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
for e in errs {
|
|
|
|
trace!("retransmit result {:?}", e);
|
|
|
|
match e {
|
|
|
|
Err(e) => return Err(Error::IO(e)),
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
*transmit_index += 1;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// retransmit messages from the leader to layer 1 nodes
|
|
|
|
/// # Remarks
|
|
|
|
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
|
|
|
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
|
|
|
|
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
|
|
|
// copy to avoid locking durring IO
|
|
|
|
let s = obj.read().unwrap();
|
|
|
|
(s.table[&s.me].clone(), s.table.values().cloned().collect())
|
|
|
|
};
|
|
|
|
let rblob = blob.read().unwrap();
|
2018-05-04 11:11:39 -07:00
|
|
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
|
|
|
let orders: Vec<_> = table
|
|
|
|
.iter()
|
|
|
|
.filter(|v| {
|
2018-04-28 00:31:20 -07:00
|
|
|
if me.id == v.id {
|
2018-05-04 11:11:39 -07:00
|
|
|
false
|
|
|
|
} else if me.current_leader_id == v.id {
|
|
|
|
trace!("skip retransmit to leader {:?}", v.id);
|
|
|
|
false
|
|
|
|
} else if v.replicate_addr == daddr {
|
|
|
|
trace!("skip nodes that are not listening {:?}", v.id);
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
true
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
2018-05-04 11:11:39 -07:00
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
let errs: Vec<_> = orders
|
|
|
|
.par_iter()
|
|
|
|
.map(|v| {
|
2018-04-28 00:31:20 -07:00
|
|
|
trace!("retransmit blob to {}", v.replicate_addr);
|
2018-05-04 11:11:39 -07:00
|
|
|
//TODO profile this, may need multiple sockets for par_iter
|
2018-04-28 00:31:20 -07:00
|
|
|
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
for e in errs {
|
|
|
|
trace!("retransmit result {:?}", e);
|
|
|
|
match e {
|
|
|
|
Err(e) => return Err(Error::IO(e)),
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-04-21 11:02:49 -07:00
|
|
|
fn random() -> u64 {
|
|
|
|
let rnd = SystemRandom::new();
|
|
|
|
let mut buf = [0u8; 8];
|
|
|
|
rnd.fill(&mut buf).unwrap();
|
|
|
|
let mut rdr = Cursor::new(&buf);
|
|
|
|
rdr.read_u64::<LittleEndian>().unwrap()
|
|
|
|
}
|
|
|
|
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
|
2018-04-28 00:31:20 -07:00
|
|
|
//trace!("get updates since {}", v);
|
2018-04-21 11:02:49 -07:00
|
|
|
let data = self.table
|
|
|
|
.values()
|
|
|
|
.filter(|x| self.local[&x.id] > v)
|
|
|
|
.cloned()
|
|
|
|
.collect();
|
|
|
|
let id = self.me;
|
|
|
|
let ups = self.update_index;
|
|
|
|
(id, ups, data)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a random gossip request
|
|
|
|
/// # Returns
|
2018-04-28 00:31:20 -07:00
|
|
|
/// (A,B)
|
|
|
|
/// * A - Address to send to
|
|
|
|
/// * B - RequestUpdates protocol message
|
2018-05-04 11:11:39 -07:00
|
|
|
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
|
|
|
if self.table.len() <= 1 {
|
|
|
|
return Err(Error::GeneralError);
|
|
|
|
}
|
|
|
|
let mut n = (Self::random() as usize) % self.table.len();
|
|
|
|
while self.table.values().nth(n).unwrap().id == self.me {
|
|
|
|
n = (Self::random() as usize) % self.table.len();
|
|
|
|
}
|
2018-04-21 11:02:49 -07:00
|
|
|
let v = self.table.values().nth(n).unwrap().clone();
|
|
|
|
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
2018-04-26 13:48:42 -07:00
|
|
|
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
2018-05-04 11:11:39 -07:00
|
|
|
Ok((v.gossip_addr, req))
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// At random pick a node and try to get updated changes from them
|
|
|
|
fn run_gossip(obj: &Arc<RwLock<Self>>) -> Result<()> {
|
|
|
|
//TODO we need to keep track of stakes and weight the selection by stake size
|
|
|
|
//TODO cache sockets
|
|
|
|
|
|
|
|
// Lock the object only to do this operation and not for any longer
|
|
|
|
// especially not when doing the `sock.send_to`
|
2018-05-04 11:11:39 -07:00
|
|
|
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?;
|
2018-04-26 13:54:29 -07:00
|
|
|
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
2018-04-21 11:02:49 -07:00
|
|
|
// TODO this will get chatty, so we need to first ask for number of updates since
|
|
|
|
// then only ask for specific data that we dont have
|
2018-04-26 13:48:42 -07:00
|
|
|
let r = serialize(&req)?;
|
2018-04-21 11:02:49 -07:00
|
|
|
sock.send_to(&r, remote_gossip_addr)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Apply updates that we received from the identity `from`
|
|
|
|
/// # Arguments
|
|
|
|
/// * `from` - identity of the sender of the updates
|
|
|
|
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
|
|
|
|
/// * `data` - the update data
|
2018-04-26 13:48:42 -07:00
|
|
|
fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) {
|
2018-04-21 11:02:49 -07:00
|
|
|
trace!("got updates {}", data.len());
|
|
|
|
// TODO we need to punish/spam resist here
|
|
|
|
// sig verify the whole update and slash anyone who sends a bad update
|
|
|
|
for v in data {
|
2018-04-28 00:31:20 -07:00
|
|
|
self.insert(v.clone());
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
*self.remote.entry(from).or_insert(update_index) = update_index;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// randomly pick a node and ask them for updates asynchronously
|
|
|
|
pub fn gossip(obj: Arc<RwLock<Self>>, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
|
|
|
spawn(move || loop {
|
|
|
|
let _ = Self::run_gossip(&obj);
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
//TODO this should be a tuned parameter
|
|
|
|
sleep(obj.read().unwrap().timeout);
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Process messages from the network
|
|
|
|
fn run_listen(obj: &Arc<RwLock<Self>>, sock: &UdpSocket) -> Result<()> {
|
|
|
|
//TODO cache connections
|
|
|
|
let mut buf = vec![0u8; 1024 * 64];
|
|
|
|
let (amt, src) = sock.recv_from(&mut buf)?;
|
|
|
|
trace!("got request from {}", src);
|
|
|
|
buf.resize(amt, 0);
|
|
|
|
let r = deserialize(&buf)?;
|
|
|
|
match r {
|
|
|
|
// TODO sigverify these
|
2018-04-26 13:48:42 -07:00
|
|
|
Protocol::RequestUpdates(v, reqdata) => {
|
2018-04-21 11:02:49 -07:00
|
|
|
trace!("RequestUpdates {}", v);
|
2018-04-26 13:48:42 -07:00
|
|
|
let addr = reqdata.gossip_addr;
|
2018-04-21 11:02:49 -07:00
|
|
|
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
|
|
|
|
let (from, ups, data) = obj.read().unwrap().get_updates_since(v);
|
|
|
|
trace!("get updates since response {} {}", v, data.len());
|
|
|
|
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
|
|
|
|
trace!("send_to {}", addr);
|
2018-04-26 13:48:42 -07:00
|
|
|
//TODO verify reqdata belongs to sender
|
2018-04-28 00:31:20 -07:00
|
|
|
obj.write().unwrap().insert(reqdata);
|
2018-04-21 11:02:49 -07:00
|
|
|
sock.send_to(&rsp, addr).unwrap();
|
|
|
|
trace!("send_to done!");
|
|
|
|
}
|
|
|
|
Protocol::ReceiveUpdates(from, ups, data) => {
|
|
|
|
trace!("ReceivedUpdates");
|
2018-04-26 13:48:42 -07:00
|
|
|
obj.write().unwrap().apply_updates(from, ups, &data);
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
pub fn listen(
|
|
|
|
obj: Arc<RwLock<Self>>,
|
|
|
|
sock: UdpSocket,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
) -> JoinHandle<()> {
|
|
|
|
sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap();
|
|
|
|
spawn(move || loop {
|
|
|
|
let _ = Self::run_listen(&obj, &sock);
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use crdt::{Crdt, ReplicatedData};
|
2018-04-28 00:31:20 -07:00
|
|
|
use logger;
|
|
|
|
use packet::Blob;
|
|
|
|
use rayon::iter::*;
|
2018-04-21 11:02:49 -07:00
|
|
|
use signature::KeyPair;
|
|
|
|
use signature::KeyPairUtil;
|
|
|
|
use std::net::UdpSocket;
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
|
use std::sync::{Arc, RwLock};
|
2018-04-26 13:48:42 -07:00
|
|
|
use std::thread::{sleep, JoinHandle};
|
2018-04-21 11:02:49 -07:00
|
|
|
use std::time::Duration;
|
|
|
|
|
2018-04-28 00:31:20 -07:00
|
|
|
fn test_node() -> (Crdt, UdpSocket, UdpSocket, UdpSocket) {
|
|
|
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
let pubkey = KeyPair::new().pubkey();
|
|
|
|
let d = ReplicatedData::new(
|
|
|
|
pubkey,
|
|
|
|
gossip.local_addr().unwrap(),
|
|
|
|
replicate.local_addr().unwrap(),
|
|
|
|
serve.local_addr().unwrap(),
|
|
|
|
);
|
|
|
|
let crdt = Crdt::new(d);
|
|
|
|
trace!(
|
|
|
|
"id: {} gossip: {} replicate: {} serve: {}",
|
|
|
|
crdt.my_data().id[0],
|
|
|
|
gossip.local_addr().unwrap(),
|
|
|
|
replicate.local_addr().unwrap(),
|
|
|
|
serve.local_addr().unwrap(),
|
|
|
|
);
|
|
|
|
(crdt, gossip, replicate, serve)
|
|
|
|
}
|
|
|
|
|
2018-04-21 11:02:49 -07:00
|
|
|
/// Test that the network converges.
|
|
|
|
/// Run until every node in the network has a full ReplicatedData set.
|
|
|
|
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
|
2018-04-26 13:50:57 -07:00
|
|
|
/// tests that actually use this function are below
|
2018-04-26 13:48:42 -07:00
|
|
|
fn run_gossip_topo<F>(topo: F)
|
|
|
|
where
|
|
|
|
F: Fn(&Vec<(Arc<RwLock<Crdt>>, JoinHandle<()>)>) -> (),
|
|
|
|
{
|
2018-04-21 11:02:49 -07:00
|
|
|
let num: usize = 5;
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
let listen: Vec<_> = (0..num)
|
|
|
|
.map(|_| {
|
2018-04-28 00:31:20 -07:00
|
|
|
let (crdt, gossip, _, _) = test_node();
|
2018-04-21 11:02:49 -07:00
|
|
|
let c = Arc::new(RwLock::new(crdt));
|
2018-04-28 00:31:20 -07:00
|
|
|
let l = Crdt::listen(c.clone(), gossip, exit.clone());
|
2018-04-21 11:02:49 -07:00
|
|
|
(c, l)
|
|
|
|
})
|
|
|
|
.collect();
|
2018-04-26 13:48:42 -07:00
|
|
|
topo(&listen);
|
2018-04-21 11:02:49 -07:00
|
|
|
let gossip: Vec<_> = listen
|
|
|
|
.iter()
|
|
|
|
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
|
|
|
|
.collect();
|
|
|
|
let mut done = true;
|
|
|
|
for _ in 0..(num * 16) {
|
|
|
|
done = true;
|
|
|
|
for &(ref c, _) in listen.iter() {
|
|
|
|
trace!(
|
|
|
|
"done updates {} {}",
|
|
|
|
c.read().unwrap().table.len(),
|
|
|
|
c.read().unwrap().update_index
|
|
|
|
);
|
|
|
|
//make sure the number of updates doesn't grow unbounded
|
|
|
|
assert!(c.read().unwrap().update_index <= num as u64);
|
|
|
|
//make sure we got all the updates
|
|
|
|
if c.read().unwrap().table.len() != num {
|
|
|
|
done = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if done == true {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
sleep(Duration::new(1, 0));
|
|
|
|
}
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
for j in gossip {
|
|
|
|
j.join().unwrap();
|
|
|
|
}
|
|
|
|
for (c, j) in listen.into_iter() {
|
|
|
|
j.join().unwrap();
|
|
|
|
// make it clear what failed
|
|
|
|
// protocol is to chatty, updates should stop after everyone receives `num`
|
|
|
|
assert!(c.read().unwrap().update_index <= num as u64);
|
|
|
|
// protocol is not chatty enough, everyone should get `num` entries
|
|
|
|
assert_eq!(c.read().unwrap().table.len(), num);
|
|
|
|
}
|
|
|
|
assert!(done);
|
|
|
|
}
|
2018-04-26 13:48:42 -07:00
|
|
|
/// ring a -> b -> c -> d -> e -> a
|
|
|
|
#[test]
|
|
|
|
fn gossip_ring_test() {
|
|
|
|
run_gossip_topo(|listen| {
|
|
|
|
let num = listen.len();
|
|
|
|
for n in 0..num {
|
|
|
|
let y = n % listen.len();
|
|
|
|
let x = (n + 1) % listen.len();
|
|
|
|
let mut xv = listen[x].0.write().unwrap();
|
|
|
|
let yv = listen[y].0.read().unwrap();
|
|
|
|
let mut d = yv.table[&yv.me].clone();
|
|
|
|
d.version = 0;
|
2018-04-28 00:31:20 -07:00
|
|
|
xv.insert(d);
|
2018-04-26 13:48:42 -07:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
/// star (b,c,d,e) -> a
|
|
|
|
#[test]
|
|
|
|
fn gossip_star_test() {
|
|
|
|
run_gossip_topo(|listen| {
|
|
|
|
let num = listen.len();
|
|
|
|
for n in 0..(num - 1) {
|
|
|
|
let x = 0;
|
|
|
|
let y = (n + 1) % listen.len();
|
|
|
|
let mut xv = listen[x].0.write().unwrap();
|
|
|
|
let yv = listen[y].0.read().unwrap();
|
|
|
|
let mut d = yv.table[&yv.me].clone();
|
|
|
|
d.version = 0;
|
2018-04-28 00:31:20 -07:00
|
|
|
xv.insert(d);
|
2018-04-26 13:48:42 -07:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2018-04-21 11:02:49 -07:00
|
|
|
/// Test that insert drops messages that are older
|
|
|
|
#[test]
|
|
|
|
fn insert_test() {
|
2018-04-28 00:31:20 -07:00
|
|
|
let mut d = ReplicatedData::new(
|
|
|
|
KeyPair::new().pubkey(),
|
|
|
|
"127.0.0.1:1234".parse().unwrap(),
|
|
|
|
"127.0.0.1:1235".parse().unwrap(),
|
|
|
|
"127.0.0.1:1236".parse().unwrap(),
|
|
|
|
);
|
2018-04-21 11:02:49 -07:00
|
|
|
assert_eq!(d.version, 0);
|
|
|
|
let mut crdt = Crdt::new(d.clone());
|
|
|
|
assert_eq!(crdt.table[&d.id].version, 0);
|
|
|
|
d.version = 2;
|
2018-04-28 00:31:20 -07:00
|
|
|
crdt.insert(d.clone());
|
2018-04-21 11:02:49 -07:00
|
|
|
assert_eq!(crdt.table[&d.id].version, 2);
|
|
|
|
d.version = 1;
|
2018-04-28 00:31:20 -07:00
|
|
|
crdt.insert(d.clone());
|
2018-04-21 11:02:49 -07:00
|
|
|
assert_eq!(crdt.table[&d.id].version, 2);
|
|
|
|
}
|
|
|
|
|
2018-04-28 00:31:20 -07:00
|
|
|
#[test]
|
|
|
|
pub fn test_crdt_retransmit() {
|
|
|
|
logger::setup();
|
|
|
|
trace!("c1:");
|
|
|
|
let (mut c1, s1, r1, e1) = test_node();
|
|
|
|
trace!("c2:");
|
|
|
|
let (mut c2, s2, r2, _) = test_node();
|
|
|
|
trace!("c3:");
|
|
|
|
let (mut c3, s3, r3, _) = test_node();
|
|
|
|
let c1_id = c1.my_data().id;
|
|
|
|
c1.set_leader(c1_id);
|
|
|
|
|
|
|
|
c2.insert(c1.my_data().clone());
|
|
|
|
c3.insert(c1.my_data().clone());
|
|
|
|
|
|
|
|
c2.set_leader(c1.my_data().id);
|
|
|
|
c3.set_leader(c1.my_data().id);
|
|
|
|
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
|
|
|
|
|
|
|
// Create listen threads
|
|
|
|
let a1 = Arc::new(RwLock::new(c1));
|
|
|
|
let t1 = Crdt::listen(a1.clone(), s1, exit.clone());
|
|
|
|
|
|
|
|
let a2 = Arc::new(RwLock::new(c2));
|
|
|
|
let t2 = Crdt::listen(a2.clone(), s2, exit.clone());
|
|
|
|
|
|
|
|
let a3 = Arc::new(RwLock::new(c3));
|
|
|
|
let t3 = Crdt::listen(a3.clone(), s3, exit.clone());
|
|
|
|
|
|
|
|
// Create gossip threads
|
|
|
|
let t1_gossip = Crdt::gossip(a1.clone(), exit.clone());
|
|
|
|
let t2_gossip = Crdt::gossip(a2.clone(), exit.clone());
|
|
|
|
let t3_gossip = Crdt::gossip(a3.clone(), exit.clone());
|
|
|
|
|
|
|
|
//wait to converge
|
|
|
|
trace!("waitng to converge:");
|
|
|
|
let mut done = false;
|
|
|
|
for _ in 0..10 {
|
|
|
|
done = a1.read().unwrap().table.len() == 3 && a2.read().unwrap().table.len() == 3
|
|
|
|
&& a3.read().unwrap().table.len() == 3;
|
|
|
|
if done {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
sleep(Duration::new(1, 0));
|
|
|
|
}
|
|
|
|
assert!(done);
|
|
|
|
let mut b = Blob::default();
|
|
|
|
b.meta.size = 10;
|
|
|
|
Crdt::retransmit(&a1, &Arc::new(RwLock::new(b)), &e1).unwrap();
|
|
|
|
let res: Vec<_> = [r1, r2, r3]
|
|
|
|
.into_par_iter()
|
|
|
|
.map(|s| {
|
|
|
|
let mut b = Blob::default();
|
|
|
|
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
|
|
|
let res = s.recv_from(&mut b.data);
|
|
|
|
res.is_err() //true if failed to receive the retransmit packet
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
//true if failed receive the retransmit packet, r2, and r3 should succeed
|
|
|
|
//r1 was the sender, so it should fail to receive the packet
|
|
|
|
assert_eq!(res, [true, false, false]);
|
|
|
|
exit.store(true, Ordering::Relaxed);
|
|
|
|
let threads = vec![t1, t2, t3, t1_gossip, t2_gossip, t3_gossip];
|
|
|
|
for t in threads.into_iter() {
|
|
|
|
t.join().unwrap();
|
|
|
|
}
|
|
|
|
}
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|