replaced lockable scoped channel variables with lock-free ones

This commit is contained in:
Vladimir Komendantskiy 2018-04-02 21:26:40 +01:00
parent e01a80dfa7
commit 186a855d2f
6 changed files with 95 additions and 91 deletions

View File

@ -6,17 +6,14 @@ authors = ["Vladimir Komendantskiy <komendantsky@gmail.com>"]
[dependencies] [dependencies]
log = "0.4.1" log = "0.4.1"
simple_logger = "0.5" simple_logger = "0.5"
tokio = "0.1"
#tokio-io = "0.1"
#tokio-timer = "0.1"
reed-solomon-erasure = "3.0" reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" } merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
ring = "^0.12" ring = "^0.12"
rand = "*" rand = "*"
error-chain = "0.11" error-chain = "0.11"
protobuf = "1.4.4" protobuf = "1.4.4"
spmc = "0.2.2"
crossbeam = "0.3.2" crossbeam = "0.3.2"
crossbeam-channel = "0.1"
[build-dependencies] [build-dependencies]
protoc-rust = "1.4.4" protoc-rust = "1.4.4"

View File

@ -1,15 +1,14 @@
//! Reliable broadcast algorithm. //! Reliable broadcast algorithm instance.
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use spmc;
use crossbeam; use crossbeam;
use proto::*; use proto::*;
use std::marker::{Send, Sync}; use std::marker::{Send, Sync};
use merkle::MerkleTree; use merkle::MerkleTree;
use merkle::proof::{Proof, Lemma, Positioned}; use merkle::proof::{Proof, Lemma, Positioned};
use reed_solomon_erasure::ReedSolomon; use reed_solomon_erasure::ReedSolomon;
use crossbeam_channel as channel;
/// Temporary placeholders for the number of participants and the maximum /// Temporary placeholders for the number of participants and the maximum
/// envisaged number of faulty nodes. Only one is required since N >= 3f + /// envisaged number of faulty nodes. Only one is required since N >= 3f +
@ -25,31 +24,30 @@ const PLACEHOLDER_F: usize = 2;
/// Broadcast stage. See the TODO note below! /// Broadcast stage. See the TODO note below!
/// ///
/// TODO: The ACS algorithm will require multiple broadcast instances running /// TODO: The ACS algorithm will require multiple broadcast instances running
/// asynchronously, see Figure 4 in the HBBFT paper. So, it's likely that the /// asynchronously, see Figure 4 in the HBBFT paper. Those are N asynchronous
/// broadcast *stage* has to be replaced with N asynchronous threads, each /// threads, each responding to values from one particular remote node. The
/// responding to values from one particular remote node. The paper doesn't make /// paper doesn't make it clear though how other messages - Echo and Ready - are
/// it clear though how other messages - Echo and Ready - are distributed over /// distributed over the instances. Also it appears that the sender of a message
/// the instances. Also it appears that the sender of a message has to become /// might become part of the message for this to work.
/// part of the message for this to work. pub struct Instance<'a, T: 'a + Send + Sync> {
pub struct Stage<T: Send + Sync> {
/// The transmit side of the multiple consumer channel to comms threads. /// The transmit side of the multiple consumer channel to comms threads.
pub tx: Arc<Mutex<spmc::Sender<Message<T>>>>, pub tx: &'a channel::Sender<Message<T>>,
/// The receive side of the multiple producer channel from comms threads. /// The receive side of the multiple producer channel from comms threads.
pub rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>, pub rx: &'a channel::Receiver<Message<T>>,
/// Value to be broadcast. /// Value to be broadcast.
pub broadcast_value: Option<T>, pub broadcast_value: Option<T>,
} }
impl<T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>> impl<'a, T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]>> + From<Vec<u8>> + AsRef<[u8]>>
Stage<T> Instance<'a, T>
where Vec<u8>: From<T> where Vec<u8>: From<T>
{ {
pub fn new(tx: Arc<Mutex<spmc::Sender<Message<T>>>>, pub fn new(tx: &'a channel::Sender<Message<T>>,
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>, rx: &'a channel::Receiver<Message<T>>,
broadcast_value: Option<T>) -> Self broadcast_value: Option<T>) -> Self
{ {
Stage { Instance {
tx: tx, tx: tx,
rx: rx, rx: rx,
broadcast_value: broadcast_value, broadcast_value: broadcast_value,
@ -62,12 +60,6 @@ where Vec<u8>: From<T>
/// TODO: Detailed error status. /// TODO: Detailed error status.
pub fn run(&mut self) -> Result<T, BroadcastError> { pub fn run(&mut self) -> Result<T, BroadcastError> {
// Broadcast state machine thread. // Broadcast state machine thread.
//
// rx cannot be cloned due to its type constraint but can be used inside
// a thread with the help of an `Arc` (`Rc` wouldn't work for the same
// reason). A `Mutex` is used to grant write access.
let rx = self.rx.to_owned();
let tx = self.tx.to_owned();
let bvalue = self.broadcast_value.to_owned(); let bvalue = self.broadcast_value.to_owned();
let result: Result<T, BroadcastError>; let result: Result<T, BroadcastError>;
let result_r = Arc::new(Mutex::new(None)); let result_r = Arc::new(Mutex::new(None));
@ -76,7 +68,7 @@ where Vec<u8>: From<T>
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
scope.spawn(move || { scope.spawn(move || {
*result_r_scoped.lock().unwrap() = *result_r_scoped.lock().unwrap() =
Some(inner_run(tx, rx, bvalue)); Some(inner_run(self.tx, self.rx, bvalue));
}); });
}); });
if let Some(ref r) = *result_r.lock().unwrap() { if let Some(ref r) = *result_r.lock().unwrap() {
@ -98,11 +90,11 @@ pub enum BroadcastError {
/// Breaks the input value into shards of equal length and encodes them -- and /// Breaks the input value into shards of equal length and encodes them -- and
/// some extra parity shards -- with a Reed-Solomon erasure coding scheme. /// some extra parity shards -- with a Reed-Solomon erasure coding scheme.
fn send_shards<T>(value: T, fn send_shards<'a, T>(value: T,
tx: Arc<Mutex<spmc::Sender<Message<T>>>>, tx: &'a channel::Sender<Message<T>>,
coding: &ReedSolomon, coding: &ReedSolomon,
data_shard_num: usize, data_shard_num: usize,
parity_shard_num: usize) parity_shard_num: usize)
where T: Clone + Debug + Send + Sync + Into<Vec<u8>> where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]> + From<Vec<u8>> + AsRef<[u8]>
, Vec<u8>: From<T> , Vec<u8>: From<T>
@ -150,16 +142,16 @@ where T: Clone + Debug + Send + Sync + Into<Vec<u8>>
for leaf_value in mtree.iter().cloned() { for leaf_value in mtree.iter().cloned() {
let proof = mtree.gen_proof(leaf_value); let proof = mtree.gen_proof(leaf_value);
if let Some(proof) = proof { if let Some(proof) = proof {
tx.lock().unwrap().send(Message::Broadcast( tx.send(Message::Broadcast(
BroadcastMessage::Value(proof))).unwrap(); BroadcastMessage::Value(proof))).unwrap();
} }
} }
} }
/// The main loop of the broadcast task. /// The main loop of the broadcast task.
fn inner_run<T>(tx: Arc<Mutex<spmc::Sender<Message<T>>>>, fn inner_run<'a, T>(tx: &'a channel::Sender<Message<T>>,
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>, rx: &'a channel::Receiver<Message<T>>,
broadcast_value: Option<T>) -> Result<T, BroadcastError> broadcast_value: Option<T>) -> Result<T, BroadcastError>
where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>> where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
+ From<Vec<u8>> + AsRef<[u8]> + From<Vec<u8>> + AsRef<[u8]>
, Vec<u8>: From<T> , Vec<u8>: From<T>
@ -175,7 +167,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
// //
// FIXME: Does the node send a proof to itself? // FIXME: Does the node send a proof to itself?
if let Some(v) = broadcast_value { if let Some(v) = broadcast_value {
send_shards(v, tx.clone(), &coding, data_shard_num, parity_shard_num); send_shards(v, tx, &coding, data_shard_num, parity_shard_num);
} }
// currently known leaf values // currently known leaf values
@ -197,7 +189,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
// TODO: handle exit conditions // TODO: handle exit conditions
while result == None { while result == None {
// Receive a message from the socket IO task. // Receive a message from the socket IO task.
let message = rx.lock().unwrap().recv().unwrap(); let message = rx.recv().unwrap();
if let Message::Broadcast(message) = message { if let Message::Broadcast(message) = message {
match message { match message {
// A value received. Record the value and multicast an echo. // A value received. Record the value and multicast an echo.
@ -220,9 +212,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
} }
} }
// Broadcast an echo of this proof. // Broadcast an echo of this proof.
tx.lock().unwrap() tx.send(Message::Broadcast(BroadcastMessage::Echo(p)))
.send(Message::Broadcast(
BroadcastMessage::Echo(p)))
.unwrap() .unwrap()
}, },
@ -266,7 +256,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
// Ready // Ready
if !ready_sent { if !ready_sent {
ready_sent = true; ready_sent = true;
tx.lock().unwrap().send(Message::Broadcast( tx.send(Message::Broadcast(
BroadcastMessage::Ready(h.to_owned()))) BroadcastMessage::Ready(h.to_owned())))
.unwrap(); .unwrap();
} }
@ -291,7 +281,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into<Vec<u8>>
if (ready_num == PLACEHOLDER_F + 1) && if (ready_num == PLACEHOLDER_F + 1) &&
!ready_sent !ready_sent
{ {
tx.lock().unwrap().send(Message::Broadcast( tx.send(Message::Broadcast(
BroadcastMessage::Ready(h.to_vec()))).unwrap(); BroadcastMessage::Ready(h.to_vec()))).unwrap();
} }

View File

@ -3,33 +3,33 @@
//! `spmc::channel()` and `mpsc::channel()`. //! `spmc::channel()` and `mpsc::channel()`.
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use spmc;
use crossbeam; use crossbeam;
use crossbeam_channel as channel;
use proto::Message; use proto::Message;
use task; use task;
/// A communication task connects a remote node to the thread that manages the /// A communication task connects a remote node to the thread that manages the
/// consensus algorithm. /// consensus algorithm.
pub struct CommsTask<'a, T: Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> pub struct CommsTask<'a, 'b, T: 'a + Send + Sync +
From<Vec<u8>> + Into<Vec<u8>>>
where Vec<u8>: From<T> where Vec<u8>: From<T>
{ {
/// The transmit side of the multiple producer channel from comms threads. /// The transmit side of the multiple producer channel from comms threads.
tx: mpsc::Sender<Message<T>>, tx: &'a channel::Sender<Message<T>>,
/// The receive side of the multiple consumer channel to comms threads. /// The receive side of the multiple consumer channel to comms threads.
rx: spmc::Receiver<Message<T>>, rx: &'a channel::Receiver<Message<T>>,
/// The socket IO task. /// The socket IO task.
task: task::Task<'a> task: task::Task<'b>
} }
impl<'a, T: Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>> impl<'a, 'b, T: Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask<'a, T> CommsTask<'a, 'b, T>
where Vec<u8>: From<T> where Vec<u8>: From<T>
{ {
pub fn new(tx: mpsc::Sender<Message<T>>, pub fn new(tx: &'a channel::Sender<Message<T>>,
rx: spmc::Receiver<Message<T>>, rx: &'a channel::Receiver<Message<T>>,
stream: &'a ::std::net::TcpStream) -> Self { stream: &'b ::std::net::TcpStream) -> Self {
CommsTask { CommsTask {
tx: tx, tx: tx,
rx: rx, rx: rx,
@ -41,8 +41,8 @@ where Vec<u8>: From<T>
/// thread requests. /// thread requests.
pub fn run(&mut self) { pub fn run(&mut self) {
// Borrow parts of `self` before entering the thread binding scope. // Borrow parts of `self` before entering the thread binding scope.
let tx = Arc::new(&self.tx); let tx = Arc::new(self.tx);
let rx = Arc::new(&self.rx); let rx = Arc::new(self.rx);
let task = Arc::new(Mutex::new(&mut self.task)); let task = Arc::new(Mutex::new(&mut self.task));
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
@ -62,7 +62,7 @@ where Vec<u8>: From<T>
// Remote comms receive loop. // Remote comms receive loop.
loop { loop {
match task.lock().unwrap().receive_message() { match task.lock().unwrap().receive_message() {
Ok(message) => // self.on_message_received(message), Ok(message) =>
tx.send(message).unwrap(), tx.send(message).unwrap(),
Err(task::Error::ProtobufError(e)) => Err(task::Error::ProtobufError(e)) =>
warn!("Protobuf error {}", e), warn!("Protobuf error {}", e),
@ -75,10 +75,4 @@ where Vec<u8>: From<T>
}); });
} }
/// Handler of a received message.
fn on_message_received(&mut self, message: Message<T>) {
// Forward the message to the manager thread.
self.tx.send(message).unwrap();
}
} }

View File

@ -2,7 +2,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::io::{Read, Write, BufReader}; use std::io::BufReader;
use std::net::{TcpStream, TcpListener, SocketAddr}; use std::net::{TcpStream, TcpListener, SocketAddr};
#[derive(Debug)] #[derive(Debug)]

View File

@ -42,9 +42,8 @@ extern crate log;
extern crate protobuf; extern crate protobuf;
extern crate ring; extern crate ring;
extern crate merkle; extern crate merkle;
//extern crate futures;
extern crate spmc;
extern crate crossbeam; extern crate crossbeam;
extern crate crossbeam_channel;
extern crate reed_solomon_erasure; extern crate reed_solomon_erasure;
mod connection; mod connection;

View File

@ -3,11 +3,9 @@ use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::marker::{Send, Sync}; use std::marker::{Send, Sync};
use std::net::{TcpListener, SocketAddr}; use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use spmc;
use crossbeam; use crossbeam;
use crossbeam_channel as channel;
use connection; use connection;
use broadcast; use broadcast;
@ -39,38 +37,64 @@ where Vec<u8>: From<T>
/// Consensus node procedure implementing HoneyBadgerBFT. /// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, ()> pub fn run(&self) -> Result<T, ()>
{ {
// Multicast channel from the manager task to comms tasks. // Multiple-producer, multiple-consumer channel from comms tasks to
let (stx, srx): (spmc::Sender<Message<T>>, // algorithm actor tasks such as Reliable Broadcast.
spmc::Receiver<Message<T>>) = spmc::channel(); let (from_comms_tx, from_comms_rx):
// Unicast channel from comms tasks to the manager task. (
let (mtx, mrx): (mpsc::Sender<Message<T>>, channel::Sender<Message<T>>,
mpsc::Receiver<Message<T>>) = mpsc::channel(); channel::Receiver<Message<T>>
) = channel::unbounded();
let (from_comms_tx, from_comms_rx) = (&from_comms_tx, &from_comms_rx);
// Multiple-producer, multiple-consumer channel from algorithm actor
// tasks such as Reliable Broadcast to comms tasks.
let (to_comms_tx, to_comms_rx):
(
channel::Sender<Message<T>>,
channel::Receiver<Message<T>>
) = channel::unbounded();
let (to_comms_tx, to_comms_rx) = (&to_comms_tx, &to_comms_rx);
let broadcast_value = self.value.to_owned(); let broadcast_value = self.value.to_owned();
let connections = connection::make(&self.addr, &self.remotes); let connections = connection::make(&self.addr, &self.remotes);
// All spawned threads will have exited by the end of the scope. // All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
// FIXME: Compute [i <- connections | v_i].
// Start a comms task for each connection. // Start a comms task for each connection.
for c in connections.iter() { for (i, c) in connections.iter().enumerate() {
info!("Creating a comms task for {:?}", // FIXME:
//
// - Connect the comms task to the broadcast instance.
//
// - Broadcast v_i through the broadcast instance?
info!("Creating a comms task #{} for {:?}", i,
c.stream.peer_addr().unwrap()); c.stream.peer_addr().unwrap());
let tx = mtx.clone();
let rx = srx.clone();
scope.spawn(move || { scope.spawn(move || {
commst::CommsTask::new(tx, rx, &c.stream).run(); commst::CommsTask::new(from_comms_tx,
to_comms_rx,
&c.stream)
.run();
});
// Associate a broadcast instance to the above comms task.
scope.spawn(move || {
match broadcast::Instance::new(to_comms_tx,
from_comms_rx,
// FIXME
None)
.run()
{
Ok(_) => debug!("Broadcast instance #{} succeeded", i),
Err(_) => error!("Broadcast instance #{} failed", i)
}
}); });
} }
// broadcast stage // TODO: continue the implementation of the asynchronous common
let (tx, rx) = (Arc::new(Mutex::new(stx)), // subset algorithm.
Arc::new(Mutex::new(mrx)));
match broadcast::Stage::new(tx, rx, broadcast_value).run() {
Ok(_) => debug!("Broadcast stage succeeded"),
Err(_) => error!("Broadcast stage failed")
}
// TODO: other stages
}); // end of thread scope }); // end of thread scope