Crdt pipeline, coalesce window repair requests in the listener by examining all of them at once, and ublock those threads from doing io.

This commit is contained in:
Anatoly Yakovenko 2018-05-27 18:21:39 -07:00 committed by Greg Fitzgerald
parent ef8eac92e3
commit cef1c208a5
11 changed files with 542 additions and 470 deletions

View File

@ -12,6 +12,7 @@ use isatty::stdin_isatty;
use pnet::datalink;
use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::mint::MintDemo;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
@ -245,9 +246,15 @@ fn converge(
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
let window = default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
spy_ref.clone(),
window.clone(),
spy_gossip,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
//wait for the network to converge
for _ in 0..30 {
let min = spy_ref.read().unwrap().convergence();
@ -257,8 +264,7 @@ fn converge(
}
sleep(Duration::new(1, 0));
}
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
threads.extend(data_replicator.thread_hdls.into_iter());
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()

View File

@ -16,27 +16,30 @@
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
use hash::Hash;
use packet::{SharedBlob, BLOB_SIZE};
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use rayon::prelude::*;
use result::{Error, Result};
use ring::rand::{SecureRandom, SystemRandom};
use signature::{KeyPair, KeyPairUtil};
use signature::{PublicKey, Signature};
use std;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, spawn, JoinHandle};
use std::time::Duration;
use streamer::{BlobReceiver, BlobSender};
/// Structure to be replicated by the network
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ReplicatedData {
pub id: PublicKey,
sig: Signature,
/// should always be increasing
version: u64,
pub version: u64,
/// address to connect to for gossip
pub gossip_addr: SocketAddr,
/// address to connect to for replication
@ -149,15 +152,20 @@ impl Crdt {
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
//somehow we signed a message for our own identity with a higher version that
// we have stored ourselves
trace!("me: {:?}", self.me[0]);
trace!("v.id: {:?}", v.id[0]);
trace!("insert! {}", v.version);
trace!(
"me: {:?} v.id: {:?} version: {}",
&self.me[..4],
&v.id[..4],
v.version
);
self.update_index += 1;
let _ = self.table.insert(v.id.clone(), v.clone());
let _ = self.local.insert(v.id, self.update_index);
} else {
trace!(
"INSERT FAILED new.version: {} me.version: {}",
"INSERT FAILED me: {:?} data: {:?} new.version: {} me.version: {}",
&self.me[..4],
&v.id[..4],
v.version,
self.table[&v.id].version
);
@ -352,18 +360,32 @@ impl Crdt {
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
if options.len() < 1 {
trace!("crdt too small for gossip");
trace!(
"crdt too small for gossip {:?} {}",
&self.me[..4],
self.table.len()
);
return Err(Error::CrdtTooSmall);
}
let n = (Self::random() as usize) % options.len();
let v = options[n].clone();
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
trace!(
"created gossip request from {:?} to {:?} {}",
&self.me[..4],
&v.id[..4],
v.gossip_addr
);
Ok((v.gossip_addr, req))
}
/// At random pick a node and try to get updated changes from them
fn run_gossip(obj: &Arc<RwLock<Self>>) -> Result<()> {
fn run_gossip(
obj: &Arc<RwLock<Self>>,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
) -> Result<()> {
//TODO we need to keep track of stakes and weight the selection by stake size
//TODO cache sockets
@ -372,12 +394,12 @@ impl Crdt {
let (remote_gossip_addr, req) = obj.read()
.expect("'obj' read lock in fn run_gossip")
.gossip_request()?;
let sock = UdpSocket::bind("0.0.0.0:0")?;
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
let r = serialize(&req)?;
trace!("sending gossip request to {}", remote_gossip_addr);
sock.send_to(&r, remote_gossip_addr)?;
let blob = to_blob(req, remote_gossip_addr, blob_recycler)?;
let mut q: VecDeque<SharedBlob> = VecDeque::new();
q.push_back(blob);
blob_sender.send(q)?;
Ok(())
}
@ -397,9 +419,14 @@ impl Crdt {
}
/// randomly pick a node and ask them for updates asynchronously
pub fn gossip(obj: Arc<RwLock<Self>>, exit: Arc<AtomicBool>) -> JoinHandle<()> {
pub fn gossip(
obj: Arc<RwLock<Self>>,
blob_recycler: BlobRecycler,
blob_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
spawn(move || loop {
let _ = Self::run_gossip(&obj);
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
if exit.load(Ordering::Relaxed) {
return;
}
@ -413,74 +440,87 @@ impl Crdt {
}
fn run_window_request(
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
sock: &UdpSocket,
from: &ReplicatedData,
ix: u64,
) -> Result<()> {
blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> {
let pos = (ix as usize) % window.read().unwrap().len();
let mut outblob = vec![];
if let &Some(ref blob) = &window.read().unwrap()[pos] {
let rblob = blob.read().unwrap();
let blob_ix = rblob.get_index().expect("run_window_request get_index");
if blob_ix == ix {
let out = blob_recycler.allocate();
// copy to avoid doing IO inside the lock
outblob.extend(&rblob.data[..rblob.meta.size]);
{
let mut outblob = out.write().unwrap();
let sz = rblob.meta.size;
outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&rblob.data[..sz]);
outblob.meta.set_addr(&from.replicate_addr);
//TODO, set the sender id to the requester so we dont retransmit
//come up with a cleaner solution for this when sender signatures are checked
outblob.set_id(from.id).expect("blob set_id");
}
return Some(out);
}
} else {
assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr);
}
if outblob.len() > 0 {
info!(
"responding RequestWindowIndex {} {}",
ix, from.replicate_addr
);
assert!(outblob.len() < BLOB_SIZE);
sock.send_to(&outblob, from.replicate_addr)?;
}
Ok(())
None
}
/// Process messages from the network
fn run_listen(
//TODO we should first coalesce all the requests
fn handle_blob(
obj: &Arc<RwLock<Self>>,
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
sock: &UdpSocket,
) -> Result<()> {
//TODO cache connections
let mut buf = vec![0u8; BLOB_SIZE];
trace!("recv_from on {}", sock.local_addr().unwrap());
let (amt, src) = sock.recv_from(&mut buf)?;
trace!("got request from {}", src);
buf.resize(amt, 0);
let r = deserialize(&buf)?;
match r {
blob_recycler: &BlobRecycler,
blob: &Blob,
) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) {
// TODO sigverify these
Protocol::RequestUpdates(v, reqdata) => {
trace!("RequestUpdates {} from {}", v, src);
Ok(Protocol::RequestUpdates(v, reqdata)) => {
trace!("RequestUpdates {}", v);
let addr = reqdata.gossip_addr;
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read()
.expect("'obj' read lock in RequestUpdates")
.get_updates_since(v);
trace!("get updates since response {} {}", v, data.len());
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
trace!("send_to {}", addr);
//TODO verify reqdata belongs to sender
obj.write()
.expect("'obj' write lock in RequestUpdates")
.insert(&reqdata);
assert!(rsp.len() < BLOB_SIZE);
sock.send_to(&rsp, addr)
.expect("'sock.send_to' in RequestUpdates");
trace!("send_to done!");
let len = data.len();
let rsp = Protocol::ReceiveUpdates(from, ups, data);
obj.write().unwrap().insert(&reqdata);
if len < 1 {
let me = obj.read().unwrap();
trace!(
"no updates me {:?} ix {} since {}",
&me.me[..4],
me.update_index,
v
);
None
} else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) {
trace!(
"sending updates me {:?} len {} to {:?} {}",
&obj.read().unwrap().me[..4],
len,
&reqdata.id[..4],
addr,
);
Some(r)
} else {
warn!("to_blob failed");
None
}
}
Protocol::ReceiveUpdates(from, ups, data) => {
trace!("ReceivedUpdates {} from {}", ups, src);
Ok(Protocol::ReceiveUpdates(from, ups, data)) => {
trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len());
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, ups, &data);
None
}
Protocol::RequestWindowIndex(from, ix) => {
Ok(Protocol::RequestWindowIndex(from, ix)) => {
//TODO verify from is signed
obj.write().unwrap().insert(&from);
let me = obj.read().unwrap().my_data().clone();
@ -491,21 +531,54 @@ impl Crdt {
me.replicate_addr
);
assert_ne!(from.replicate_addr, me.replicate_addr);
let _ = Self::run_window_request(window, sock, &from, ix);
Self::run_window_request(&window, &from, ix, blob_recycler)
}
Err(_) => {
warn!("deserialize crdt packet failed");
None
}
}
}
/// Process messages from the network
fn run_listen(
obj: &Arc<RwLock<Self>>,
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
blob_recycler: &BlobRecycler,
requests_receiver: &BlobReceiver,
response_sender: &BlobSender,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let mut reqs = requests_receiver.recv_timeout(timeout)?;
while let Ok(mut more) = requests_receiver.try_recv() {
reqs.append(&mut more);
}
let resp: VecDeque<_> = reqs.iter()
.filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap()))
.collect();
response_sender.send(resp)?;
while let Some(r) = reqs.pop_front() {
blob_recycler.recycle(r);
}
Ok(())
}
pub fn listen(
obj: Arc<RwLock<Self>>,
window: Arc<RwLock<Vec<Option<SharedBlob>>>>,
sock: UdpSocket,
blob_recycler: BlobRecycler,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
sock.set_read_timeout(Some(Duration::new(2, 0)))
.expect("'sock.set_read_timeout' in crdt.rs");
spawn(move || loop {
let e = Self::run_listen(&obj, &window, &sock);
let e = Self::run_listen(
&obj,
&window,
&blob_recycler,
&requests_receiver,
&response_sender,
);
if e.is_err() {
info!(
"run_listen timeout, table size: {}",
@ -519,134 +592,57 @@ impl Crdt {
}
}
#[cfg(test)]
mod tests {
use crdt::{Crdt, ReplicatedData};
use logger;
use packet::Blob;
use rayon::iter::*;
use signature::KeyPair;
use signature::KeyPairUtil;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
pub struct Sockets {
pub gossip: UdpSocket,
pub gossip_send: UdpSocket,
pub requests: UdpSocket,
pub replicate: UdpSocket,
pub transaction: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
}
fn test_node() -> (Crdt, UdpSocket, UdpSocket, UdpSocket) {
pub struct TestNode {
pub data: ReplicatedData,
pub sockets: Sockets,
}
impl TestNode {
pub fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
let data = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
transactions.local_addr().unwrap(),
requests.local_addr().unwrap(),
transaction.local_addr().unwrap(),
);
let crdt = Crdt::new(d);
trace!(
"id: {} gossip: {} replicate: {} serve: {}",
crdt.my_data().id[0],
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(crdt, gossip, replicate, serve)
TestNode {
data: data,
sockets: Sockets {
gossip,
gossip_send,
requests,
replicate,
transaction,
respond,
broadcast,
},
}
}
}
/// Test that the network converges.
/// Run until every node in the network has a full ReplicatedData set.
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
/// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F)
where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, JoinHandle<()>)>) -> (),
{
let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num)
.map(|_| {
let (crdt, gossip, _, _) = test_node();
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let l = Crdt::listen(c.clone(), w, gossip, exit.clone());
(c, l)
})
.collect();
topo(&listen);
let gossip: Vec<_> = listen
.iter()
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
.collect();
let mut done = true;
for i in 0..(num * 32) {
done = false;
trace!("round {}", i);
for &(ref c, _) in listen.iter() {
if num == c.read().unwrap().convergence() as usize {
done = true;
break;
}
}
//at least 1 node converged
if done == true {
break;
}
sleep(Duration::new(1, 0));
}
exit.store(true, Ordering::Relaxed);
for j in gossip {
j.join().unwrap();
}
for (c, j) in listen.into_iter() {
j.join().unwrap();
// make it clear what failed
// protocol is to chatty, updates should stop after everyone receives `num`
assert!(c.read().unwrap().update_index <= num as u64);
// protocol is not chatty enough, everyone should get `num` entries
assert_eq!(c.read().unwrap().table.len(), num);
}
assert!(done);
}
/// ring a -> b -> c -> d -> e -> a
#[test]
#[ignore]
fn gossip_ring_test() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
});
}
/// star (b,c,d,e) -> a
#[test]
#[ignore]
fn gossip_star_test() {
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..(num - 1) {
let x = 0;
let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
});
}
#[cfg(test)]
mod tests {
use crdt::{Crdt, ReplicatedData};
use signature::{KeyPair, KeyPairUtil};
/// Test that insert drops messages that are older
#[test]
@ -668,77 +664,59 @@ mod tests {
crdt.insert(&d);
assert_eq!(crdt.table[&d.id].version, 2);
}
#[test]
#[ignore]
pub fn test_crdt_retransmit() {
logger::setup();
trace!("c1:");
let (mut c1, s1, r1, e1) = test_node();
trace!("c2:");
let (mut c2, s2, r2, _) = test_node();
trace!("c3:");
let (mut c3, s3, r3, _) = test_node();
let c1_id = c1.my_data().id;
c1.set_leader(c1_id);
c2.insert(&c1.my_data());
c3.insert(&c1.my_data());
c2.set_leader(c1.my_data().id);
c3.set_leader(c1.my_data().id);
let exit = Arc::new(AtomicBool::new(false));
// Create listen threads
let win1 = Arc::new(RwLock::new(vec![]));
let a1 = Arc::new(RwLock::new(c1));
let t1 = Crdt::listen(a1.clone(), win1, s1, exit.clone());
let a2 = Arc::new(RwLock::new(c2));
let win2 = Arc::new(RwLock::new(vec![]));
let t2 = Crdt::listen(a2.clone(), win2, s2, exit.clone());
let a3 = Arc::new(RwLock::new(c3));
let win3 = Arc::new(RwLock::new(vec![]));
let t3 = Crdt::listen(a3.clone(), win3, s3, exit.clone());
// Create gossip threads
let t1_gossip = Crdt::gossip(a1.clone(), exit.clone());
let t2_gossip = Crdt::gossip(a2.clone(), exit.clone());
let t3_gossip = Crdt::gossip(a3.clone(), exit.clone());
//wait to converge
trace!("waitng to converge:");
let mut done = false;
for _ in 0..30 {
done = a1.read().unwrap().table.len() == 3 && a2.read().unwrap().table.len() == 3
&& a3.read().unwrap().table.len() == 3;
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
let mut b = Blob::default();
b.meta.size = 10;
Crdt::retransmit(&a1, &Arc::new(RwLock::new(b)), &e1).unwrap();
let res: Vec<_> = [r1, r2, r3]
.into_par_iter()
.map(|s| {
let mut b = Blob::default();
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let res = s.recv_from(&mut b.data);
res.is_err() //true if failed to receive the retransmit packet
})
.collect();
//true if failed receive the retransmit packet, r2, and r3 should succeed
//r1 was the sender, so it should fail to receive the packet
assert_eq!(res, [true, false, false]);
exit.store(true, Ordering::Relaxed);
let threads = vec![t1, t2, t3, t1_gossip, t2_gossip, t3_gossip];
for t in threads.into_iter() {
t.join().unwrap();
}
fn sorted(ls: &Vec<ReplicatedData>) -> Vec<ReplicatedData> {
let mut copy: Vec<_> = ls.iter().map(|x| x.clone()).collect();
copy.sort_by(|x, y| x.id.cmp(&y.id));
copy
}
#[test]
fn update_test() {
let d1 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
let d2 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
let d3 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
let mut crdt = Crdt::new(d1.clone());
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 1);
assert_eq!(ups.len(), 1);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone()]));
crdt.insert(&d2);
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 2);
assert_eq!(ups.len(), 2);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()]));
crdt.insert(&d3);
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 3);
assert_eq!(ups.len(), 3);
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
let mut crdt2 = Crdt::new(d2.clone());
crdt2.apply_updates(key, ix, &ups);
assert_eq!(crdt2.table.values().len(), 3);
assert_eq!(
sorted(&crdt2.table.values().map(|x| x.clone()).collect()),
sorted(&crdt.table.values().map(|x| x.clone()).collect())
);
}
}

238
src/data_replicator.rs Normal file
View File

@ -0,0 +1,238 @@
use crdt;
use packet;
use result::Result;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use streamer;
pub struct DataReplicator {
pub thread_hdls: Vec<JoinHandle<()>>,
}
impl DataReplicator {
pub fn new(
crdt: Arc<RwLock<crdt::Crdt>>,
window: Arc<RwLock<Vec<Option<packet::SharedBlob>>>>,
gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<DataReplicator> {
let blob_recycler = packet::BlobRecycler::default();
let (request_sender, request_receiver) = channel();
trace!(
"DataReplicator: id: {:?}, listening on: {:?}",
&crdt.read().unwrap().me[..4],
gossip_listen_socket.local_addr().unwrap()
);
let t_receiver = streamer::blob_receiver(
exit.clone(),
blob_recycler.clone(),
gossip_listen_socket,
request_sender,
)?;
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder(
gossip_send_socket,
exit.clone(),
blob_recycler.clone(),
response_receiver,
);
let t_listen = crdt::Crdt::listen(
crdt.clone(),
window,
blob_recycler.clone(),
request_receiver,
response_sender.clone(),
exit.clone(),
);
let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Ok(DataReplicator { thread_hdls })
}
}
#[cfg(test)]
mod tests {
use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use logger;
use packet::Blob;
use rayon::iter::*;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, DataReplicator, UdpSocket) {
let tn = TestNode::new();
let crdt = Crdt::new(tn.data.clone());
let c = Arc::new(RwLock::new(crdt));
let w = Arc::new(RwLock::new(vec![]));
let d = DataReplicator::new(
c.clone(),
w,
tn.sockets.gossip,
tn.sockets.gossip_send,
exit,
).unwrap();
(c, d, tn.sockets.replicate)
}
/// Test that the network converges.
/// Run until every node in the network has a full ReplicatedData set.
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
/// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F)
where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, DataReplicator, UdpSocket)>) -> (),
{
let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect();
topo(&listen);
let mut done = true;
for i in 0..(num * 32) {
done = false;
trace!("round {}", i);
for &(ref c, _, _) in listen.iter() {
if num == c.read().unwrap().convergence() as usize {
done = true;
break;
}
}
//at least 1 node converged
if done == true {
break;
}
sleep(Duration::new(1, 0));
}
exit.store(true, Ordering::Relaxed);
for (c, dr, _) in listen.into_iter() {
for j in dr.thread_hdls.into_iter() {
j.join().unwrap();
}
// make it clear what failed
// protocol is to chatty, updates should stop after everyone receives `num`
assert!(c.read().unwrap().update_index <= num as u64);
// protocol is not chatty enough, everyone should get `num` entries
assert_eq!(c.read().unwrap().table.len(), num);
}
assert!(done);
}
/// ring a -> b -> c -> d -> e -> a
#[test]
fn gossip_integration_ring_test() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
});
}
/// star a -> (b,c,d,e)
#[test]
fn gossip_integration_star_test() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..(num - 1) {
let x = 0;
let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut yd = yv.table[&yv.me].clone();
yd.version = 0;
xv.insert(&yd);
trace!("star leader {:?}", &xv.me[..4]);
}
});
}
/// rstar a <- (b,c,d,e)
#[test]
fn gossip_integration_rstar_test() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
let xd = {
let xv = listen[0].0.read().unwrap();
xv.table[&xv.me].clone()
};
trace!("rstar leader {:?}", &xd.id[..4]);
for n in 0..(num - 1) {
let y = (n + 1) % listen.len();
let mut yv = listen[y].0.write().unwrap();
yv.insert(&xd);
trace!("rstar insert {:?} into {:?}", &xd.id[..4], &yv.me[..4]);
}
});
}
#[test]
pub fn test_crdt_retransmit() {
logger::setup();
let exit = Arc::new(AtomicBool::new(false));
trace!("c1:");
let (c1, dr1, tn1) = test_node(exit.clone());
trace!("c2:");
let (c2, dr2, tn2) = test_node(exit.clone());
trace!("c3:");
let (c3, dr3, tn3) = test_node(exit.clone());
let c1_data = c1.read().unwrap().my_data().clone();
c1.write().unwrap().set_leader(c1_data.id);
c2.write().unwrap().insert(&c1_data);
c3.write().unwrap().insert(&c1_data);
c2.write().unwrap().set_leader(c1_data.id);
c3.write().unwrap().set_leader(c1_data.id);
//wait to converge
trace!("waiting to converge:");
let mut done = false;
for _ in 0..30 {
done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3
&& c3.read().unwrap().table.len() == 3;
if done {
break;
}
sleep(Duration::new(1, 0));
}
assert!(done);
let mut b = Blob::default();
b.meta.size = 10;
Crdt::retransmit(&c1, &Arc::new(RwLock::new(b)), &tn1).unwrap();
let res: Vec<_> = [tn1, tn2, tn3]
.into_par_iter()
.map(|s| {
let mut b = Blob::default();
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let res = s.recv_from(&mut b.data);
res.is_err() //true if failed to receive the retransmit packet
})
.collect();
//true if failed receive the retransmit packet, r2, and r3 should succeed
//r1 was the sender, so it should fail to receive the packet
assert_eq!(res, [true, false, false]);
exit.store(true, Ordering::Relaxed);
let mut threads = vec![];
threads.extend(dr1.thread_hdls.into_iter());
threads.extend(dr2.thread_hdls.into_iter());
threads.extend(dr3.thread_hdls.into_iter());
for t in threads.into_iter() {
t.join().unwrap();
}
}
}

View File

@ -3,6 +3,7 @@ pub mod bank;
pub mod banking_stage;
pub mod budget;
pub mod crdt;
pub mod data_replicator;
pub mod entry;
pub mod entry_writer;
#[cfg(feature = "erasure")]

View File

@ -180,10 +180,10 @@ impl Packets {
socket.set_nonblocking(false)?;
for p in &mut self.packets {
p.meta.size = 0;
trace!("receiving");
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
debug!("got {:?} messages", i);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Err(e) => {
@ -250,6 +250,7 @@ pub fn to_blob<T: Serialize>(
// the raw bytes are being serialized and sent, this isn't the
// right interface, and we should create a separate path for
// sending request responses in the RPU
assert!(len < BLOB_SIZE);
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
@ -283,7 +284,8 @@ impl Blob {
self.data[..BLOB_INDEX_END].clone_from_slice(&wtr);
Ok(())
}
/// sender id, we use this for identifying if its a blob from the leader that we should
/// retransmit. eventually blobs should have a signature that we can use ffor spam filtering
pub fn get_id(&self) -> Result<PublicKey> {
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
Ok(e)
@ -317,9 +319,10 @@ impl Blob {
let r = re.allocate();
{
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
trace!("got {:?} messages", i);
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Err(e) => {

View File

@ -2,6 +2,7 @@
use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator;
use hash::Hash;
use packet;
use rpu::Rpu;
@ -51,19 +52,26 @@ impl Server {
thread_hdls.extend(tpu.thread_hdls);
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip_socket, exit.clone());
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
crdt.clone(),
window.clone(),
gossip_socket,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
thread_hdls.extend(data_replicator.thread_hdls);
let t_broadcast = streamer::broadcaster(
broadcast_socket,
exit.clone(),
crdt.clone(),
crdt,
window,
blob_recycler.clone(),
tpu.blob_receiver,
);
thread_hdls.extend(vec![t_gossip, t_listen, t_broadcast]);
thread_hdls.extend(vec![t_broadcast]);
Server { thread_hdls }
}

View File

@ -106,6 +106,7 @@ pub fn responder(
//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
trace!("receiving on {}", sock.local_addr().unwrap());
let dq = Blob::recv_from(recycler, sock)?;
if !dq.is_empty() {
s.send(dq)?;
@ -584,7 +585,6 @@ mod bench {
#[cfg(test)]
mod test {
use crdt::{Crdt, ReplicatedData};
use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use signature::KeyPair;
use signature::KeyPairUtil;
@ -595,10 +595,9 @@ mod test {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
use streamer::{default_window, BlobReceiver, PacketReceiver};
use streamer::{blob_receiver, receiver, responder, window};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
@ -735,111 +734,4 @@ mod test {
t_responder.join().expect("join");
t_window.join().expect("join");
}
fn test_node() -> (Arc<RwLock<Crdt>>, UdpSocket, UdpSocket, UdpSocket) {
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
let transaction = UdpSocket::bind("127.0.0.1:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
transaction.local_addr().unwrap(),
);
trace!("data: {:?}", d);
let crdt = Crdt::new(d);
(Arc::new(RwLock::new(crdt)), gossip, replicate, serve)
}
#[test]
#[ignore]
//retransmit from leader to replicate target
pub fn retransmit() {
logger::setup();
trace!("retransmit test start");
let exit = Arc::new(AtomicBool::new(false));
let (crdt_leader, sock_gossip_leader, _, sock_leader) = test_node();
let (crdt_target, sock_gossip_target, sock_replicate_target, _) = test_node();
let leader_data = crdt_leader.read().unwrap().my_data().clone();
crdt_leader.write().unwrap().insert(&leader_data);
crdt_leader.write().unwrap().set_leader(leader_data.id);
let t_crdt_leader_g = Crdt::gossip(crdt_leader.clone(), exit.clone());
let window_leader = Arc::new(RwLock::new(vec![]));
let t_crdt_leader_l = Crdt::listen(
crdt_leader.clone(),
window_leader,
sock_gossip_leader,
exit.clone(),
);
crdt_target.write().unwrap().insert(&leader_data);
crdt_target.write().unwrap().set_leader(leader_data.id);
let t_crdt_target_g = Crdt::gossip(crdt_target.clone(), exit.clone());
let window_target = Arc::new(RwLock::new(vec![]));
let t_crdt_target_l = Crdt::listen(
crdt_target.clone(),
window_target,
sock_gossip_target,
exit.clone(),
);
//leader retransmitter
let (s_retransmit, r_retransmit) = channel();
let blob_recycler = BlobRecycler::default();
let saddr = sock_leader.local_addr().unwrap();
let t_retransmit = retransmitter(
sock_leader,
exit.clone(),
crdt_leader.clone(),
blob_recycler.clone(),
r_retransmit,
);
//target receiver
let (s_blob_receiver, r_blob_receiver) = channel();
let t_receiver = blob_receiver(
exit.clone(),
blob_recycler.clone(),
sock_replicate_target,
s_blob_receiver,
).unwrap();
for _ in 0..10 {
let done = crdt_target.read().unwrap().update_index == 2
&& crdt_leader.read().unwrap().update_index == 2;
if done {
break;
}
let timer = Duration::new(1, 0);
sleep(timer);
}
//send the data through
let mut bq = VecDeque::new();
let b = blob_recycler.allocate();
b.write().unwrap().meta.size = 10;
bq.push_back(b);
s_retransmit.send(bq).unwrap();
let timer = Duration::new(5, 0);
trace!("Waiting for timeout");
let mut oq = r_blob_receiver.recv_timeout(timer).unwrap();
assert_eq!(oq.len(), 1);
let o = oq.pop_front().unwrap();
let ro = o.read().unwrap();
assert_eq!(ro.meta.size, 10);
assert_eq!(ro.meta.addr(), saddr);
exit.store(true, Ordering::Relaxed);
let threads = vec![
t_receiver,
t_retransmit,
t_crdt_target_g,
t_crdt_target_l,
t_crdt_leader_g,
t_crdt_leader_l,
];
for t in threads {
t.join().unwrap();
}
}
}

View File

@ -177,18 +177,18 @@ mod tests {
use super::*;
use bank::Bank;
use budget::Budget;
use crdt::TestNode;
use futures::Future;
use logger;
use mint::Mint;
use server::Server;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::sleep;
use std::time::Duration;
use transaction::{Instruction, Plan};
use tvu::TestNode;
#[test]
fn test_thin_client() {

View File

@ -56,7 +56,6 @@ impl Tpu {
Mutex::new(writer),
record_stage.entry_receiver,
);
let mut thread_hdls = vec![
fetch_stage.thread_hdl,
banking_stage.thread_hdl,

View File

@ -22,9 +22,9 @@
use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use data_replicator::DataReplicator;
use packet;
use replicate_stage::ReplicateStage;
use signature::{KeyPair, KeyPairUtil};
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
@ -49,7 +49,7 @@ impl Tvu {
pub fn new(
bank: Arc<Bank>,
me: ReplicatedData,
gossip: UdpSocket,
gossip_listen_socket: UdpSocket,
replicate: UdpSocket,
leader: ReplicatedData,
exit: Arc<AtomicBool>,
@ -62,9 +62,15 @@ impl Tvu {
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&leader);
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let window = streamer::default_window();
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let data_replicator = DataReplicator::new(
crdt.clone(),
window.clone(),
gossip_listen_socket,
gossip_send_socket,
exit.clone(),
).expect("DataReplicator::new");
// TODO pull this socket out through the public interface
// make sure we are on the same interface
@ -111,108 +117,52 @@ impl Tvu {
blob_recycler.clone(),
);
let threads = vec![
let mut threads = vec![
//replicate threads
t_blob_receiver,
t_retransmit,
t_window,
replicate_stage.thread_hdl,
t_gossip,
t_listen,
];
threads.extend(data_replicator.thread_hdls.into_iter());
Tvu {
thread_hdls: threads,
}
}
}
pub struct Sockets {
pub gossip: UdpSocket,
pub requests: UdpSocket,
pub replicate: UdpSocket,
pub transaction: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
}
pub struct TestNode {
pub data: ReplicatedData,
pub sockets: Sockets,
}
impl TestNode {
pub fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
transaction.local_addr().unwrap(),
);
TestNode {
data: data,
sockets: Sockets {
gossip,
requests,
replicate,
transaction,
respond,
broadcast,
},
}
}
}
#[cfg(test)]
pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
use signature::{KeyPair, KeyPairUtil};
use std::time::Duration;
let transactions_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests_socket.local_addr().unwrap(),
transactions_socket.local_addr().unwrap(),
);
(d, gossip, replicate, requests_socket, transactions_socket)
}
#[cfg(test)]
pub mod tests {
use bank::Bank;
use bincode::serialize;
use crdt::Crdt;
use crdt::{Crdt, TestNode};
use data_replicator::DataReplicator;
use entry::Entry;
use hash::{hash, Hash};
use logger;
use mint::Mint;
use packet::BlobRecycler;
use result::Result;
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use tvu::{TestNode, Tvu};
use tvu::Tvu;
fn new_replicator(
crdt: Arc<RwLock<Crdt>>,
listen: UdpSocket,
exit: Arc<AtomicBool>,
) -> Result<DataReplicator> {
let window = streamer::default_window();
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
DataReplicator::new(crdt, window, listen, send_sock, exit)
}
/// Test that message sent from leader to target1 and replicated to target2
#[test]
fn test_replicate() {
@ -227,9 +177,7 @@ pub mod tests {
crdt_l.set_leader(leader.data.id);
let cref_l = Arc::new(RwLock::new(crdt_l));
let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone());
let window1 = streamer::default_window();
let t_l_listen = Crdt::listen(cref_l, window1, leader.sockets.gossip, exit.clone());
let dr_l = new_replicator(cref_l, leader.sockets.gossip, exit.clone()).unwrap();
//start crdt2
let mut crdt2 = Crdt::new(target2.data.clone());
@ -237,9 +185,7 @@ pub mod tests {
crdt2.set_leader(leader.data.id);
let leader_id = leader.data.id;
let cref2 = Arc::new(RwLock::new(crdt2));
let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone());
let window2 = streamer::default_window();
let t2_listen = Crdt::listen(cref2, window2, target2.sockets.gossip, exit.clone());
let dr_2 = new_replicator(cref2, target2.sockets.gossip, exit.clone()).unwrap();
// setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to
@ -337,11 +283,13 @@ pub mod tests {
for t in tvu.thread_hdls {
t.join().expect("join");
}
t2_gossip.join().expect("join");
t2_listen.join().expect("join");
for t in dr_l.thread_hdls {
t.join().expect("join");
}
for t in dr_2.thread_hdls {
t.join().expect("join");
}
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_l_gossip.join().expect("join");
t_l_listen.join().expect("join");
}
}

View File

@ -6,14 +6,15 @@ extern crate solana;
use futures::Future;
use solana::bank::Bank;
use solana::crdt::TestNode;
use solana::crdt::{Crdt, ReplicatedData};
use solana::data_replicator::DataReplicator;
use solana::logger;
use solana::mint::Mint;
use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil, PublicKey};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
use solana::tvu::TestNode;
use std::io;
use std::io::sink;
use std::net::UdpSocket;
@ -59,16 +60,15 @@ fn converge(
let mut spy_crdt = Crdt::new(spy.data);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let t_spy_listen = Crdt::listen(
let dr = DataReplicator::new(
spy_ref.clone(),
spy_window,
spy.sockets.gossip,
exit.clone(),
);
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
spy.sockets.gossip_send,
exit,
).unwrap();
//wait for the network to converge
let mut converged = false;
for _ in 0..30 {
@ -80,8 +80,7 @@ fn converge(
sleep(Duration::new(1, 0));
}
assert!(converged);
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
threads.extend(dr.thread_hdls.into_iter());
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()