2018-04-21 11:02:49 -07:00
|
|
|
//! The `crdt` module defines a data structure that is shared by all the nodes in the network over
|
2018-04-28 00:31:20 -07:00
|
|
|
//! a gossip control plane. The goal is to share small bits of off-chain information and detect and
|
2018-04-21 11:02:49 -07:00
|
|
|
//! repair partitions.
|
|
|
|
//!
|
|
|
|
//! This CRDT only supports a very limited set of types. A map of PublicKey -> Versioned Struct.
|
2018-05-15 04:35:41 -07:00
|
|
|
//! The last version is always picked during an update.
|
2018-04-28 00:31:20 -07:00
|
|
|
//!
|
|
|
|
//! The network is arranged in layers:
|
|
|
|
//!
|
|
|
|
//! * layer 0 - Leader.
|
|
|
|
//! * layer 1 - As many nodes as we can fit
|
|
|
|
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
|
|
|
|
//!
|
2018-05-14 14:33:11 -07:00
|
|
|
//! Bank needs to provide an interface for us to query the stake weight
|
2018-04-21 11:02:49 -07:00
|
|
|
|
|
|
|
use bincode::{deserialize, serialize};
|
|
|
|
use byteorder::{LittleEndian, ReadBytesExt};
|
|
|
|
use hash::Hash;
|
2018-05-27 18:21:39 -07:00
|
|
|
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
|
2018-04-28 00:31:20 -07:00
|
|
|
use rayon::prelude::*;
|
|
|
|
use result::{Error, Result};
|
2018-04-21 11:02:49 -07:00
|
|
|
use ring::rand::{SecureRandom, SystemRandom};
|
2018-05-27 18:21:39 -07:00
|
|
|
use signature::{KeyPair, KeyPairUtil};
|
2018-04-21 11:02:49 -07:00
|
|
|
use signature::{PublicKey, Signature};
|
2018-05-16 23:11:51 -07:00
|
|
|
use std;
|
2018-04-21 11:02:49 -07:00
|
|
|
use std::collections::HashMap;
|
2018-05-27 18:21:39 -07:00
|
|
|
use std::collections::VecDeque;
|
2018-04-21 11:02:49 -07:00
|
|
|
use std::io::Cursor;
|
|
|
|
use std::net::{SocketAddr, UdpSocket};
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
|
use std::sync::{Arc, RwLock};
|
2018-05-30 13:25:32 -07:00
|
|
|
use std::thread::{sleep, Builder, JoinHandle};
|
2018-04-21 11:02:49 -07:00
|
|
|
use std::time::Duration;
|
2018-05-27 18:21:39 -07:00
|
|
|
use streamer::{BlobReceiver, BlobSender};
|
2018-04-21 11:02:49 -07:00
|
|
|
|
|
|
|
/// Structure to be replicated by the network
|
2018-05-27 18:21:39 -07:00
|
|
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
2018-04-21 11:02:49 -07:00
|
|
|
pub struct ReplicatedData {
|
2018-04-28 00:31:20 -07:00
|
|
|
pub id: PublicKey,
|
2018-04-21 11:02:49 -07:00
|
|
|
sig: Signature,
|
|
|
|
/// should always be increasing
|
2018-05-27 18:21:39 -07:00
|
|
|
pub version: u64,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// address to connect to for gossip
|
2018-04-28 00:31:20 -07:00
|
|
|
pub gossip_addr: SocketAddr,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// address to connect to for replication
|
2018-04-28 00:31:20 -07:00
|
|
|
pub replicate_addr: SocketAddr,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// address to connect to when this node is leader
|
2018-05-23 07:11:11 -07:00
|
|
|
pub requests_addr: SocketAddr,
|
2018-05-25 14:51:41 -07:00
|
|
|
/// transactions address
|
|
|
|
pub transactions_addr: SocketAddr,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// current leader identity
|
2018-05-23 13:03:19 -07:00
|
|
|
pub current_leader_id: PublicKey,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// last verified hash that was submitted to the leader
|
|
|
|
last_verified_hash: Hash,
|
|
|
|
/// last verified count, always increasing
|
|
|
|
last_verified_count: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ReplicatedData {
|
2018-04-28 00:31:20 -07:00
|
|
|
pub fn new(
|
|
|
|
id: PublicKey,
|
|
|
|
gossip_addr: SocketAddr,
|
|
|
|
replicate_addr: SocketAddr,
|
2018-05-23 07:11:11 -07:00
|
|
|
requests_addr: SocketAddr,
|
2018-05-25 14:51:41 -07:00
|
|
|
transactions_addr: SocketAddr,
|
2018-04-28 00:31:20 -07:00
|
|
|
) -> ReplicatedData {
|
2018-04-21 11:02:49 -07:00
|
|
|
ReplicatedData {
|
|
|
|
id,
|
|
|
|
sig: Signature::default(),
|
|
|
|
version: 0,
|
|
|
|
gossip_addr,
|
2018-04-28 00:31:20 -07:00
|
|
|
replicate_addr,
|
2018-05-23 07:11:11 -07:00
|
|
|
requests_addr,
|
2018-05-25 14:51:41 -07:00
|
|
|
transactions_addr,
|
2018-04-21 11:02:49 -07:00
|
|
|
current_leader_id: PublicKey::default(),
|
|
|
|
last_verified_hash: Hash::default(),
|
|
|
|
last_verified_count: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// `Crdt` structure keeps a table of `ReplicatedData` structs
|
|
|
|
/// # Properties
|
|
|
|
/// * `table` - map of public id's to versioned and signed ReplicatedData structs
|
|
|
|
/// * `local` - map of public id's to what `self.update_index` `self.table` was updated
|
|
|
|
/// * `remote` - map of public id's to the `remote.update_index` was sent
|
|
|
|
/// * `update_index` - my update index
|
|
|
|
/// # Remarks
|
|
|
|
/// This implements two services, `gossip` and `listen`.
|
|
|
|
/// * `gossip` - asynchronously ask nodes to send updates
|
|
|
|
/// * `listen` - listen for requests and responses
|
|
|
|
/// No attempt to keep track of timeouts or dropped requests is made, or should be.
|
|
|
|
pub struct Crdt {
|
2018-05-12 19:00:22 -07:00
|
|
|
pub table: HashMap<PublicKey, ReplicatedData>,
|
2018-04-21 11:02:49 -07:00
|
|
|
/// Value of my update index when entry in table was updated.
|
|
|
|
/// Nodes will ask for updates since `update_index`, and this node
|
|
|
|
/// should respond with all the identities that are greater then the
|
|
|
|
/// request's `update_index` in this list
|
|
|
|
local: HashMap<PublicKey, u64>,
|
2018-05-15 04:35:41 -07:00
|
|
|
/// The value of the remote update index that I have last seen
|
2018-04-21 11:02:49 -07:00
|
|
|
/// This Node will ask external nodes for updates since the value in this list
|
2018-05-04 11:11:39 -07:00
|
|
|
pub remote: HashMap<PublicKey, u64>,
|
2018-04-28 00:31:20 -07:00
|
|
|
pub update_index: u64,
|
2018-05-12 19:00:22 -07:00
|
|
|
pub me: PublicKey,
|
2018-04-21 11:02:49 -07:00
|
|
|
timeout: Duration,
|
|
|
|
}
|
|
|
|
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
|
enum Protocol {
|
2018-04-26 13:48:42 -07:00
|
|
|
/// forward your own latest data structure when requesting an update
|
|
|
|
/// this doesn't update the `remote` update index, but it allows the
|
|
|
|
/// recepient of this request to add knowledge of this node to the network
|
|
|
|
RequestUpdates(u64, ReplicatedData),
|
2018-04-21 11:02:49 -07:00
|
|
|
//TODO might need a since?
|
|
|
|
/// from id, form's last update index, ReplicatedData
|
|
|
|
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>),
|
2018-05-12 19:00:22 -07:00
|
|
|
/// ask for a missing index
|
|
|
|
RequestWindowIndex(ReplicatedData, u64),
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Crdt {
|
|
|
|
pub fn new(me: ReplicatedData) -> Crdt {
|
|
|
|
assert_eq!(me.version, 0);
|
|
|
|
let mut g = Crdt {
|
|
|
|
table: HashMap::new(),
|
|
|
|
local: HashMap::new(),
|
|
|
|
remote: HashMap::new(),
|
|
|
|
me: me.id,
|
|
|
|
update_index: 1,
|
2018-05-12 19:00:22 -07:00
|
|
|
timeout: Duration::from_millis(100),
|
2018-04-21 11:02:49 -07:00
|
|
|
};
|
|
|
|
g.local.insert(me.id, g.update_index);
|
|
|
|
g.table.insert(me.id, me);
|
|
|
|
g
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
pub fn my_data(&self) -> &ReplicatedData {
|
|
|
|
&self.table[&self.me]
|
|
|
|
}
|
|
|
|
pub fn leader_data(&self) -> &ReplicatedData {
|
|
|
|
&self.table[&self.table[&self.me].current_leader_id]
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set_leader(&mut self, key: PublicKey) -> () {
|
|
|
|
let mut me = self.my_data().clone();
|
|
|
|
me.current_leader_id = key;
|
|
|
|
me.version += 1;
|
2018-05-12 19:00:22 -07:00
|
|
|
self.insert(&me);
|
2018-04-26 13:48:42 -07:00
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
|
2018-05-12 19:00:22 -07:00
|
|
|
pub fn insert(&mut self, v: &ReplicatedData) {
|
2018-04-28 00:31:20 -07:00
|
|
|
// TODO check that last_verified types are always increasing
|
2018-04-21 11:02:49 -07:00
|
|
|
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
|
2018-04-28 00:31:20 -07:00
|
|
|
//somehow we signed a message for our own identity with a higher version that
|
|
|
|
// we have stored ourselves
|
2018-05-27 18:21:39 -07:00
|
|
|
trace!(
|
|
|
|
"me: {:?} v.id: {:?} version: {}",
|
|
|
|
&self.me[..4],
|
|
|
|
&v.id[..4],
|
|
|
|
v.version
|
|
|
|
);
|
2018-04-21 11:02:49 -07:00
|
|
|
self.update_index += 1;
|
2018-04-28 00:31:20 -07:00
|
|
|
let _ = self.table.insert(v.id.clone(), v.clone());
|
2018-04-21 11:02:49 -07:00
|
|
|
let _ = self.local.insert(v.id, self.update_index);
|
|
|
|
} else {
|
2018-04-28 00:31:20 -07:00
|
|
|
trace!(
|
2018-05-27 18:21:39 -07:00
|
|
|
"INSERT FAILED me: {:?} data: {:?} new.version: {} me.version: {}",
|
|
|
|
&self.me[..4],
|
|
|
|
&v.id[..4],
|
2018-04-28 00:31:20 -07:00
|
|
|
v.version,
|
|
|
|
self.table[&v.id].version
|
|
|
|
);
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
|
|
|
|
/// broadcast messages from the leader to layer 1 nodes
|
|
|
|
/// # Remarks
|
|
|
|
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
|
|
|
pub fn broadcast(
|
|
|
|
obj: &Arc<RwLock<Self>>,
|
|
|
|
blobs: &Vec<SharedBlob>,
|
|
|
|
s: &UdpSocket,
|
|
|
|
transmit_index: &mut u64,
|
|
|
|
) -> Result<()> {
|
|
|
|
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
2018-05-15 04:35:41 -07:00
|
|
|
// copy to avoid locking during IO
|
2018-05-09 18:10:48 -07:00
|
|
|
let robj = obj.read().expect("'obj' read lock in pub fn broadcast");
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!("broadcast table {}", robj.table.len());
|
2018-04-28 00:31:20 -07:00
|
|
|
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
|
|
|
|
(robj.table[&robj.me].clone(), cloned_table)
|
|
|
|
};
|
2018-05-04 11:11:39 -07:00
|
|
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
2018-05-12 19:00:22 -07:00
|
|
|
let nodes: Vec<&ReplicatedData> = table
|
2018-04-28 00:31:20 -07:00
|
|
|
.iter()
|
2018-05-04 11:11:39 -07:00
|
|
|
.filter(|v| {
|
2018-04-28 00:31:20 -07:00
|
|
|
if me.id == v.id {
|
2018-05-04 11:11:39 -07:00
|
|
|
//filter myself
|
|
|
|
false
|
|
|
|
} else if v.replicate_addr == daddr {
|
|
|
|
//filter nodes that are not listening
|
|
|
|
false
|
|
|
|
} else {
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!("broadcast node {}", v.replicate_addr);
|
2018-05-04 11:11:39 -07:00
|
|
|
true
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
2018-05-04 11:11:39 -07:00
|
|
|
})
|
2018-05-12 19:00:22 -07:00
|
|
|
.collect();
|
2018-05-14 22:06:42 -07:00
|
|
|
if nodes.len() < 1 {
|
2018-05-23 13:03:19 -07:00
|
|
|
warn!("crdt too small");
|
2018-05-15 07:24:52 -07:00
|
|
|
return Err(Error::CrdtTooSmall);
|
2018-05-14 22:06:42 -07:00
|
|
|
}
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!("nodes table {}", nodes.len());
|
|
|
|
trace!("blobs table {}", blobs.len());
|
2018-05-15 04:35:41 -07:00
|
|
|
// enumerate all the blobs, those are the indices
|
2018-05-12 19:00:22 -07:00
|
|
|
// transmit them to nodes, starting from a different node
|
|
|
|
let orders: Vec<_> = blobs
|
|
|
|
.iter()
|
2018-05-04 11:11:39 -07:00
|
|
|
.enumerate()
|
2018-05-12 19:00:22 -07:00
|
|
|
.zip(
|
|
|
|
nodes
|
|
|
|
.iter()
|
|
|
|
.cycle()
|
|
|
|
.skip((*transmit_index as usize) % nodes.len()),
|
|
|
|
)
|
2018-05-04 11:11:39 -07:00
|
|
|
.collect();
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!("orders table {}", orders.len());
|
2018-05-04 11:11:39 -07:00
|
|
|
let errs: Vec<_> = orders
|
2018-05-12 19:00:22 -07:00
|
|
|
.into_iter()
|
|
|
|
.map(|((i, b), v)| {
|
2018-04-28 00:31:20 -07:00
|
|
|
// only leader should be broadcasting
|
|
|
|
assert!(me.current_leader_id != v.id);
|
2018-05-09 18:10:48 -07:00
|
|
|
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
|
|
|
|
blob.set_id(me.id).expect("set_id in pub fn broadcast");
|
2018-04-28 00:31:20 -07:00
|
|
|
blob.set_index(*transmit_index + i as u64)
|
2018-05-09 18:10:48 -07:00
|
|
|
.expect("set_index in pub fn broadcast");
|
2018-05-04 11:11:39 -07:00
|
|
|
//TODO profile this, may need multiple sockets for par_iter
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
2018-05-24 23:18:41 -07:00
|
|
|
assert!(blob.meta.size < BLOB_SIZE);
|
2018-05-12 19:00:22 -07:00
|
|
|
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
2018-05-12 19:00:22 -07:00
|
|
|
e
|
2018-04-28 00:31:20 -07:00
|
|
|
})
|
|
|
|
.collect();
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!("broadcast results {}", errs.len());
|
2018-04-28 00:31:20 -07:00
|
|
|
for e in errs {
|
|
|
|
match e {
|
2018-05-12 19:00:22 -07:00
|
|
|
Err(e) => {
|
|
|
|
error!("broadcast result {:?}", e);
|
|
|
|
return Err(Error::IO(e));
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
*transmit_index += 1;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// retransmit messages from the leader to layer 1 nodes
|
|
|
|
/// # Remarks
|
|
|
|
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
|
|
|
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
|
|
|
|
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
2018-05-15 04:35:41 -07:00
|
|
|
// copy to avoid locking during IO
|
2018-05-09 18:10:48 -07:00
|
|
|
let s = obj.read().expect("'obj' read lock in pub fn retransmit");
|
2018-04-28 00:31:20 -07:00
|
|
|
(s.table[&s.me].clone(), s.table.values().cloned().collect())
|
|
|
|
};
|
2018-05-12 19:00:22 -07:00
|
|
|
blob.write()
|
|
|
|
.unwrap()
|
|
|
|
.set_id(me.id)
|
|
|
|
.expect("set_id in pub fn retransmit");
|
|
|
|
let rblob = blob.read().unwrap();
|
2018-05-04 11:11:39 -07:00
|
|
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
|
|
|
let orders: Vec<_> = table
|
|
|
|
.iter()
|
|
|
|
.filter(|v| {
|
2018-04-28 00:31:20 -07:00
|
|
|
if me.id == v.id {
|
2018-05-04 11:11:39 -07:00
|
|
|
false
|
|
|
|
} else if me.current_leader_id == v.id {
|
|
|
|
trace!("skip retransmit to leader {:?}", v.id);
|
|
|
|
false
|
|
|
|
} else if v.replicate_addr == daddr {
|
|
|
|
trace!("skip nodes that are not listening {:?}", v.id);
|
|
|
|
false
|
|
|
|
} else {
|
|
|
|
true
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
2018-05-04 11:11:39 -07:00
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
let errs: Vec<_> = orders
|
|
|
|
.par_iter()
|
|
|
|
.map(|v| {
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!(
|
2018-05-12 19:00:22 -07:00
|
|
|
"retransmit blob {} to {}",
|
|
|
|
rblob.get_index().unwrap(),
|
|
|
|
v.replicate_addr
|
|
|
|
);
|
2018-05-04 11:11:39 -07:00
|
|
|
//TODO profile this, may need multiple sockets for par_iter
|
2018-05-24 23:18:41 -07:00
|
|
|
assert!(rblob.meta.size < BLOB_SIZE);
|
2018-04-28 00:31:20 -07:00
|
|
|
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
for e in errs {
|
|
|
|
match e {
|
2018-05-12 19:00:22 -07:00
|
|
|
Err(e) => {
|
|
|
|
info!("retransmit error {:?}", e);
|
|
|
|
return Err(Error::IO(e));
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2018-05-16 22:58:32 -07:00
|
|
|
// max number of nodes that we could be converged to
|
2018-05-16 22:54:06 -07:00
|
|
|
pub fn convergence(&self) -> u64 {
|
2018-05-16 23:17:45 -07:00
|
|
|
let max = self.remote.values().len() as u64 + 1;
|
2018-05-16 23:18:56 -07:00
|
|
|
self.remote.values().fold(max, |a, b| std::cmp::min(a, *b))
|
2018-05-16 22:54:06 -07:00
|
|
|
}
|
2018-05-16 23:11:51 -07:00
|
|
|
|
2018-04-21 11:02:49 -07:00
|
|
|
fn random() -> u64 {
|
|
|
|
let rnd = SystemRandom::new();
|
|
|
|
let mut buf = [0u8; 8];
|
2018-05-09 18:10:48 -07:00
|
|
|
rnd.fill(&mut buf).expect("rnd.fill in pub fn random");
|
2018-04-21 11:02:49 -07:00
|
|
|
let mut rdr = Cursor::new(&buf);
|
2018-05-11 11:38:52 -07:00
|
|
|
rdr.read_u64::<LittleEndian>()
|
|
|
|
.expect("rdr.read_u64 in fn random")
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
|
2018-04-28 00:31:20 -07:00
|
|
|
//trace!("get updates since {}", v);
|
2018-05-30 12:07:28 -07:00
|
|
|
let data = self.table
|
2018-04-21 11:02:49 -07:00
|
|
|
.values()
|
|
|
|
.filter(|x| self.local[&x.id] > v)
|
|
|
|
.cloned()
|
|
|
|
.collect();
|
|
|
|
let id = self.me;
|
|
|
|
let ups = self.update_index;
|
|
|
|
(id, ups, data)
|
|
|
|
}
|
|
|
|
|
2018-05-12 19:00:22 -07:00
|
|
|
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
|
2018-05-24 23:18:41 -07:00
|
|
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
2018-05-30 12:07:28 -07:00
|
|
|
let valid: Vec<_> = self.table
|
2018-05-24 23:18:41 -07:00
|
|
|
.values()
|
|
|
|
.filter(|r| r.id != self.me && r.replicate_addr != daddr)
|
|
|
|
.collect();
|
|
|
|
if valid.is_empty() {
|
2018-05-15 07:24:52 -07:00
|
|
|
return Err(Error::CrdtTooSmall);
|
2018-05-12 19:00:22 -07:00
|
|
|
}
|
2018-05-24 23:18:41 -07:00
|
|
|
let n = (Self::random() as usize) % valid.len();
|
|
|
|
let addr = valid[n].gossip_addr.clone();
|
2018-05-12 19:00:22 -07:00
|
|
|
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
|
|
|
|
let out = serialize(&req)?;
|
|
|
|
Ok((addr, out))
|
|
|
|
}
|
|
|
|
|
2018-04-21 11:02:49 -07:00
|
|
|
/// Create a random gossip request
|
|
|
|
/// # Returns
|
2018-04-28 00:31:20 -07:00
|
|
|
/// (A,B)
|
|
|
|
/// * A - Address to send to
|
|
|
|
/// * B - RequestUpdates protocol message
|
2018-05-04 11:11:39 -07:00
|
|
|
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
2018-05-12 19:00:22 -07:00
|
|
|
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
|
|
|
|
if options.len() < 1 {
|
2018-05-27 18:21:39 -07:00
|
|
|
trace!(
|
|
|
|
"crdt too small for gossip {:?} {}",
|
|
|
|
&self.me[..4],
|
|
|
|
self.table.len()
|
|
|
|
);
|
2018-05-15 07:24:52 -07:00
|
|
|
return Err(Error::CrdtTooSmall);
|
2018-05-04 11:11:39 -07:00
|
|
|
}
|
2018-05-12 19:00:22 -07:00
|
|
|
let n = (Self::random() as usize) % options.len();
|
|
|
|
let v = options[n].clone();
|
2018-04-21 11:02:49 -07:00
|
|
|
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
2018-04-26 13:48:42 -07:00
|
|
|
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
2018-05-27 18:21:39 -07:00
|
|
|
trace!(
|
|
|
|
"created gossip request from {:?} to {:?} {}",
|
|
|
|
&self.me[..4],
|
|
|
|
&v.id[..4],
|
|
|
|
v.gossip_addr
|
|
|
|
);
|
2018-05-04 11:11:39 -07:00
|
|
|
Ok((v.gossip_addr, req))
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// At random pick a node and try to get updated changes from them
|
2018-05-27 18:21:39 -07:00
|
|
|
fn run_gossip(
|
|
|
|
obj: &Arc<RwLock<Self>>,
|
|
|
|
blob_sender: &BlobSender,
|
|
|
|
blob_recycler: &BlobRecycler,
|
|
|
|
) -> Result<()> {
|
2018-04-21 11:02:49 -07:00
|
|
|
//TODO we need to keep track of stakes and weight the selection by stake size
|
|
|
|
//TODO cache sockets
|
|
|
|
|
|
|
|
// Lock the object only to do this operation and not for any longer
|
|
|
|
// especially not when doing the `sock.send_to`
|
2018-05-30 12:07:28 -07:00
|
|
|
let (remote_gossip_addr, req) = obj.read()
|
2018-05-11 11:38:52 -07:00
|
|
|
.expect("'obj' read lock in fn run_gossip")
|
|
|
|
.gossip_request()?;
|
2018-04-21 11:02:49 -07:00
|
|
|
// TODO this will get chatty, so we need to first ask for number of updates since
|
|
|
|
// then only ask for specific data that we dont have
|
2018-05-27 18:21:39 -07:00
|
|
|
let blob = to_blob(req, remote_gossip_addr, blob_recycler)?;
|
|
|
|
let mut q: VecDeque<SharedBlob> = VecDeque::new();
|
|
|
|
q.push_back(blob);
|
|
|
|
blob_sender.send(q)?;
|
2018-04-21 11:02:49 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Apply updates that we received from the identity `from`
|
|
|
|
/// # Arguments
|
|
|
|
/// * `from` - identity of the sender of the updates
|
|
|
|
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
|
|
|
|
/// * `data` - the update data
|
2018-04-26 13:48:42 -07:00
|
|
|
fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) {
|
2018-04-21 11:02:49 -07:00
|
|
|
trace!("got updates {}", data.len());
|
|
|
|
// TODO we need to punish/spam resist here
|
|
|
|
// sig verify the whole update and slash anyone who sends a bad update
|
|
|
|
for v in data {
|
2018-05-12 19:00:22 -07:00
|
|
|
self.insert(&v);
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
*self.remote.entry(from).or_insert(update_index) = update_index;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// randomly pick a node and ask them for updates asynchronously
|
2018-05-27 18:21:39 -07:00
|
|
|
pub fn gossip(
|
|
|
|
obj: Arc<RwLock<Self>>,
|
|
|
|
blob_recycler: BlobRecycler,
|
|
|
|
blob_sender: BlobSender,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:25:32 -07:00
|
|
|
Builder::new()
|
|
|
|
.name("solana-gossip".to_string())
|
|
|
|
.spawn(move || loop {
|
|
|
|
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
//TODO this should be a tuned parameter
|
|
|
|
sleep(
|
|
|
|
obj.read()
|
|
|
|
.expect("'obj' read lock in pub fn gossip")
|
|
|
|
.timeout,
|
|
|
|
);
|
|
|
|
})
|
|
|
|
.unwrap()
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
2018-05-12 19:00:22 -07:00
|
|
|
fn run_window_request(
|
|
|
|
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
|
|
|
from: &ReplicatedData,
|
|
|
|
ix: u64,
|
2018-05-27 18:21:39 -07:00
|
|
|
blob_recycler: &BlobRecycler,
|
|
|
|
) -> Option<SharedBlob> {
|
2018-05-12 19:00:22 -07:00
|
|
|
let pos = (ix as usize) % window.read().unwrap().len();
|
|
|
|
if let &Some(ref blob) = &window.read().unwrap()[pos] {
|
|
|
|
let rblob = blob.read().unwrap();
|
|
|
|
let blob_ix = rblob.get_index().expect("run_window_request get_index");
|
|
|
|
if blob_ix == ix {
|
2018-05-27 18:21:39 -07:00
|
|
|
let out = blob_recycler.allocate();
|
2018-05-12 19:00:22 -07:00
|
|
|
// copy to avoid doing IO inside the lock
|
2018-05-27 18:21:39 -07:00
|
|
|
{
|
|
|
|
let mut outblob = out.write().unwrap();
|
|
|
|
let sz = rblob.meta.size;
|
|
|
|
outblob.meta.size = sz;
|
|
|
|
outblob.data[..sz].copy_from_slice(&rblob.data[..sz]);
|
|
|
|
outblob.meta.set_addr(&from.replicate_addr);
|
|
|
|
//TODO, set the sender id to the requester so we dont retransmit
|
|
|
|
//come up with a cleaner solution for this when sender signatures are checked
|
|
|
|
outblob.set_id(from.id).expect("blob set_id");
|
|
|
|
}
|
|
|
|
return Some(out);
|
2018-05-12 19:00:22 -07:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
assert!(window.read().unwrap()[pos].is_none());
|
|
|
|
info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr);
|
|
|
|
}
|
2018-05-27 18:21:39 -07:00
|
|
|
None
|
2018-05-12 19:00:22 -07:00
|
|
|
}
|
2018-05-27 18:21:39 -07:00
|
|
|
|
|
|
|
//TODO we should first coalesce all the requests
|
|
|
|
fn handle_blob(
|
2018-05-12 19:00:22 -07:00
|
|
|
obj: &Arc<RwLock<Self>>,
|
|
|
|
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
2018-05-27 18:21:39 -07:00
|
|
|
blob_recycler: &BlobRecycler,
|
|
|
|
blob: &Blob,
|
|
|
|
) -> Option<SharedBlob> {
|
|
|
|
match deserialize(&blob.data[..blob.meta.size]) {
|
2018-04-21 11:02:49 -07:00
|
|
|
// TODO sigverify these
|
2018-05-27 18:21:39 -07:00
|
|
|
Ok(Protocol::RequestUpdates(v, reqdata)) => {
|
|
|
|
trace!("RequestUpdates {}", v);
|
2018-04-26 13:48:42 -07:00
|
|
|
let addr = reqdata.gossip_addr;
|
2018-05-15 04:35:41 -07:00
|
|
|
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
|
2018-05-30 12:07:28 -07:00
|
|
|
let (from, ups, data) = obj.read()
|
2018-05-11 11:38:52 -07:00
|
|
|
.expect("'obj' read lock in RequestUpdates")
|
|
|
|
.get_updates_since(v);
|
2018-04-21 11:02:49 -07:00
|
|
|
trace!("get updates since response {} {}", v, data.len());
|
2018-05-27 18:21:39 -07:00
|
|
|
let len = data.len();
|
|
|
|
let rsp = Protocol::ReceiveUpdates(from, ups, data);
|
|
|
|
obj.write().unwrap().insert(&reqdata);
|
|
|
|
if len < 1 {
|
|
|
|
let me = obj.read().unwrap();
|
|
|
|
trace!(
|
|
|
|
"no updates me {:?} ix {} since {}",
|
|
|
|
&me.me[..4],
|
|
|
|
me.update_index,
|
|
|
|
v
|
|
|
|
);
|
|
|
|
None
|
|
|
|
} else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) {
|
|
|
|
trace!(
|
|
|
|
"sending updates me {:?} len {} to {:?} {}",
|
|
|
|
&obj.read().unwrap().me[..4],
|
|
|
|
len,
|
|
|
|
&reqdata.id[..4],
|
|
|
|
addr,
|
|
|
|
);
|
|
|
|
Some(r)
|
|
|
|
} else {
|
|
|
|
warn!("to_blob failed");
|
|
|
|
None
|
|
|
|
}
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
2018-05-27 18:21:39 -07:00
|
|
|
Ok(Protocol::ReceiveUpdates(from, ups, data)) => {
|
|
|
|
trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len());
|
2018-05-11 11:38:52 -07:00
|
|
|
obj.write()
|
|
|
|
.expect("'obj' write lock in ReceiveUpdates")
|
|
|
|
.apply_updates(from, ups, &data);
|
2018-05-27 18:21:39 -07:00
|
|
|
None
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
2018-05-27 18:21:39 -07:00
|
|
|
Ok(Protocol::RequestWindowIndex(from, ix)) => {
|
2018-05-12 19:00:22 -07:00
|
|
|
//TODO verify from is signed
|
|
|
|
obj.write().unwrap().insert(&from);
|
|
|
|
let me = obj.read().unwrap().my_data().clone();
|
2018-05-24 16:26:05 -07:00
|
|
|
trace!(
|
2018-05-12 19:00:22 -07:00
|
|
|
"received RequestWindowIndex {} {} myaddr {}",
|
2018-05-24 16:30:01 -07:00
|
|
|
ix,
|
|
|
|
from.replicate_addr,
|
|
|
|
me.replicate_addr
|
2018-05-12 19:00:22 -07:00
|
|
|
);
|
|
|
|
assert_ne!(from.replicate_addr, me.replicate_addr);
|
2018-05-27 18:21:39 -07:00
|
|
|
Self::run_window_request(&window, &from, ix, blob_recycler)
|
|
|
|
}
|
|
|
|
Err(_) => {
|
|
|
|
warn!("deserialize crdt packet failed");
|
|
|
|
None
|
2018-05-12 19:00:22 -07:00
|
|
|
}
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
2018-05-27 18:21:39 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Process messages from the network
|
|
|
|
fn run_listen(
|
|
|
|
obj: &Arc<RwLock<Self>>,
|
|
|
|
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
|
|
|
blob_recycler: &BlobRecycler,
|
|
|
|
requests_receiver: &BlobReceiver,
|
|
|
|
response_sender: &BlobSender,
|
|
|
|
) -> Result<()> {
|
|
|
|
//TODO cache connections
|
|
|
|
let timeout = Duration::new(1, 0);
|
|
|
|
let mut reqs = requests_receiver.recv_timeout(timeout)?;
|
|
|
|
while let Ok(mut more) = requests_receiver.try_recv() {
|
|
|
|
reqs.append(&mut more);
|
|
|
|
}
|
2018-05-30 12:07:28 -07:00
|
|
|
let resp: VecDeque<_> = reqs.iter()
|
2018-05-27 18:21:39 -07:00
|
|
|
.filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap()))
|
|
|
|
.collect();
|
|
|
|
response_sender.send(resp)?;
|
|
|
|
while let Some(r) = reqs.pop_front() {
|
|
|
|
blob_recycler.recycle(r);
|
|
|
|
}
|
2018-04-21 11:02:49 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
pub fn listen(
|
|
|
|
obj: Arc<RwLock<Self>>,
|
2018-05-12 19:00:22 -07:00
|
|
|
window: Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
2018-05-27 18:21:39 -07:00
|
|
|
blob_recycler: BlobRecycler,
|
|
|
|
requests_receiver: BlobReceiver,
|
|
|
|
response_sender: BlobSender,
|
2018-04-21 11:02:49 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:25:32 -07:00
|
|
|
Builder::new()
|
|
|
|
.name("solana-listen".to_string())
|
|
|
|
.spawn(move || loop {
|
|
|
|
let e = Self::run_listen(
|
|
|
|
&obj,
|
|
|
|
&window,
|
|
|
|
&blob_recycler,
|
|
|
|
&requests_receiver,
|
|
|
|
&response_sender,
|
2018-05-23 21:45:40 -07:00
|
|
|
);
|
2018-05-30 13:25:32 -07:00
|
|
|
if e.is_err() {
|
|
|
|
info!(
|
|
|
|
"run_listen timeout, table size: {}",
|
|
|
|
obj.read().unwrap().table.len()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap()
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-27 18:21:39 -07:00
|
|
|
pub struct Sockets {
|
|
|
|
pub gossip: UdpSocket,
|
|
|
|
pub gossip_send: UdpSocket,
|
|
|
|
pub requests: UdpSocket,
|
|
|
|
pub replicate: UdpSocket,
|
|
|
|
pub transaction: UdpSocket,
|
|
|
|
pub respond: UdpSocket,
|
|
|
|
pub broadcast: UdpSocket,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct TestNode {
|
|
|
|
pub data: ReplicatedData,
|
|
|
|
pub sockets: Sockets,
|
|
|
|
}
|
2018-04-21 11:02:49 -07:00
|
|
|
|
2018-05-27 18:21:39 -07:00
|
|
|
impl TestNode {
|
|
|
|
pub fn new() -> TestNode {
|
2018-04-28 00:31:20 -07:00
|
|
|
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
|
2018-05-27 18:21:39 -07:00
|
|
|
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
|
2018-04-28 00:31:20 -07:00
|
|
|
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
|
2018-05-27 18:21:39 -07:00
|
|
|
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
|
|
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
|
2018-04-28 00:31:20 -07:00
|
|
|
let pubkey = KeyPair::new().pubkey();
|
2018-05-27 18:21:39 -07:00
|
|
|
let data = ReplicatedData::new(
|
2018-04-28 00:31:20 -07:00
|
|
|
pubkey,
|
|
|
|
gossip.local_addr().unwrap(),
|
|
|
|
replicate.local_addr().unwrap(),
|
2018-05-27 18:21:39 -07:00
|
|
|
requests.local_addr().unwrap(),
|
|
|
|
transaction.local_addr().unwrap(),
|
2018-04-28 00:31:20 -07:00
|
|
|
);
|
2018-05-27 18:21:39 -07:00
|
|
|
TestNode {
|
|
|
|
data: data,
|
|
|
|
sockets: Sockets {
|
|
|
|
gossip,
|
|
|
|
gossip_send,
|
|
|
|
requests,
|
|
|
|
replicate,
|
|
|
|
transaction,
|
|
|
|
respond,
|
|
|
|
broadcast,
|
|
|
|
},
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|
2018-04-26 13:48:42 -07:00
|
|
|
}
|
2018-05-27 18:21:39 -07:00
|
|
|
}
|
2018-04-26 13:48:42 -07:00
|
|
|
|
2018-05-27 18:21:39 -07:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use crdt::{Crdt, ReplicatedData};
|
|
|
|
use signature::{KeyPair, KeyPairUtil};
|
2018-04-26 13:48:42 -07:00
|
|
|
|
2018-04-21 11:02:49 -07:00
|
|
|
/// Test that insert drops messages that are older
|
|
|
|
#[test]
|
|
|
|
fn insert_test() {
|
2018-04-28 00:31:20 -07:00
|
|
|
let mut d = ReplicatedData::new(
|
|
|
|
KeyPair::new().pubkey(),
|
|
|
|
"127.0.0.1:1234".parse().unwrap(),
|
|
|
|
"127.0.0.1:1235".parse().unwrap(),
|
|
|
|
"127.0.0.1:1236".parse().unwrap(),
|
2018-05-23 07:11:11 -07:00
|
|
|
"127.0.0.1:1237".parse().unwrap(),
|
2018-04-28 00:31:20 -07:00
|
|
|
);
|
2018-04-21 11:02:49 -07:00
|
|
|
assert_eq!(d.version, 0);
|
|
|
|
let mut crdt = Crdt::new(d.clone());
|
|
|
|
assert_eq!(crdt.table[&d.id].version, 0);
|
|
|
|
d.version = 2;
|
2018-05-12 19:00:22 -07:00
|
|
|
crdt.insert(&d);
|
2018-04-21 11:02:49 -07:00
|
|
|
assert_eq!(crdt.table[&d.id].version, 2);
|
|
|
|
d.version = 1;
|
2018-05-12 19:00:22 -07:00
|
|
|
crdt.insert(&d);
|
2018-04-21 11:02:49 -07:00
|
|
|
assert_eq!(crdt.table[&d.id].version, 2);
|
|
|
|
}
|
2018-05-27 18:21:39 -07:00
|
|
|
fn sorted(ls: &Vec<ReplicatedData>) -> Vec<ReplicatedData> {
|
2018-05-30 10:04:11 -07:00
|
|
|
let mut copy: Vec<_> = ls.iter().cloned().collect();
|
2018-05-27 18:21:39 -07:00
|
|
|
copy.sort_by(|x, y| x.id.cmp(&y.id));
|
|
|
|
copy
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
#[test]
|
2018-05-27 18:21:39 -07:00
|
|
|
fn update_test() {
|
|
|
|
let d1 = ReplicatedData::new(
|
|
|
|
KeyPair::new().pubkey(),
|
|
|
|
"127.0.0.1:1234".parse().unwrap(),
|
|
|
|
"127.0.0.1:1235".parse().unwrap(),
|
|
|
|
"127.0.0.1:1236".parse().unwrap(),
|
|
|
|
"127.0.0.1:1237".parse().unwrap(),
|
|
|
|
);
|
|
|
|
let d2 = ReplicatedData::new(
|
|
|
|
KeyPair::new().pubkey(),
|
|
|
|
"127.0.0.1:1234".parse().unwrap(),
|
|
|
|
"127.0.0.1:1235".parse().unwrap(),
|
|
|
|
"127.0.0.1:1236".parse().unwrap(),
|
|
|
|
"127.0.0.1:1237".parse().unwrap(),
|
|
|
|
);
|
|
|
|
let d3 = ReplicatedData::new(
|
|
|
|
KeyPair::new().pubkey(),
|
|
|
|
"127.0.0.1:1234".parse().unwrap(),
|
|
|
|
"127.0.0.1:1235".parse().unwrap(),
|
|
|
|
"127.0.0.1:1236".parse().unwrap(),
|
|
|
|
"127.0.0.1:1237".parse().unwrap(),
|
|
|
|
);
|
|
|
|
let mut crdt = Crdt::new(d1.clone());
|
|
|
|
let (key, ix, ups) = crdt.get_updates_since(0);
|
|
|
|
assert_eq!(key, d1.id);
|
|
|
|
assert_eq!(ix, 1);
|
|
|
|
assert_eq!(ups.len(), 1);
|
|
|
|
assert_eq!(sorted(&ups), sorted(&vec![d1.clone()]));
|
|
|
|
crdt.insert(&d2);
|
|
|
|
let (key, ix, ups) = crdt.get_updates_since(0);
|
|
|
|
assert_eq!(key, d1.id);
|
|
|
|
assert_eq!(ix, 2);
|
|
|
|
assert_eq!(ups.len(), 2);
|
|
|
|
assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()]));
|
|
|
|
crdt.insert(&d3);
|
|
|
|
let (key, ix, ups) = crdt.get_updates_since(0);
|
|
|
|
assert_eq!(key, d1.id);
|
|
|
|
assert_eq!(ix, 3);
|
|
|
|
assert_eq!(ups.len(), 3);
|
|
|
|
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
|
|
|
|
let mut crdt2 = Crdt::new(d2.clone());
|
|
|
|
crdt2.apply_updates(key, ix, &ups);
|
|
|
|
assert_eq!(crdt2.table.values().len(), 3);
|
|
|
|
assert_eq!(
|
|
|
|
sorted(&crdt2.table.values().map(|x| x.clone()).collect()),
|
|
|
|
sorted(&crdt.table.values().map(|x| x.clone()).collect())
|
|
|
|
);
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
2018-05-27 18:21:39 -07:00
|
|
|
|
2018-04-21 11:02:49 -07:00
|
|
|
}
|