diff --git a/Cargo.toml b/Cargo.toml index bef5f6a..9390383 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,17 +6,14 @@ authors = ["Vladimir Komendantskiy "] [dependencies] log = "0.4.1" simple_logger = "0.5" -tokio = "0.1" -#tokio-io = "0.1" -#tokio-timer = "0.1" reed-solomon-erasure = "3.0" merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" } ring = "^0.12" rand = "*" error-chain = "0.11" protobuf = "1.4.4" -spmc = "0.2.2" crossbeam = "0.3.2" +crossbeam-channel = "0.1" [build-dependencies] protoc-rust = "1.4.4" diff --git a/src/broadcast/mod.rs b/src/broadcast/mod.rs index 965a22f..3962ed2 100644 --- a/src/broadcast/mod.rs +++ b/src/broadcast/mod.rs @@ -1,15 +1,14 @@ -//! Reliable broadcast algorithm. +//! Reliable broadcast algorithm instance. use std::fmt::Debug; use std::hash::Hash; use std::sync::{Arc, Mutex}; -use std::sync::mpsc; -use spmc; use crossbeam; use proto::*; use std::marker::{Send, Sync}; use merkle::MerkleTree; use merkle::proof::{Proof, Lemma, Positioned}; use reed_solomon_erasure::ReedSolomon; +use crossbeam_channel as channel; /// Temporary placeholders for the number of participants and the maximum /// envisaged number of faulty nodes. Only one is required since N >= 3f + @@ -25,31 +24,30 @@ const PLACEHOLDER_F: usize = 2; /// Broadcast stage. See the TODO note below! /// /// TODO: The ACS algorithm will require multiple broadcast instances running -/// asynchronously, see Figure 4 in the HBBFT paper. So, it's likely that the -/// broadcast *stage* has to be replaced with N asynchronous threads, each -/// responding to values from one particular remote node. The paper doesn't make -/// it clear though how other messages - Echo and Ready - are distributed over -/// the instances. Also it appears that the sender of a message has to become -/// part of the message for this to work. -pub struct Stage { +/// asynchronously, see Figure 4 in the HBBFT paper. Those are N asynchronous +/// threads, each responding to values from one particular remote node. The +/// paper doesn't make it clear though how other messages - Echo and Ready - are +/// distributed over the instances. Also it appears that the sender of a message +/// might become part of the message for this to work. +pub struct Instance<'a, T: 'a + Send + Sync> { /// The transmit side of the multiple consumer channel to comms threads. - pub tx: Arc>>>, + pub tx: &'a channel::Sender>, /// The receive side of the multiple producer channel from comms threads. - pub rx: Arc>>>, + pub rx: &'a channel::Receiver>, /// Value to be broadcast. pub broadcast_value: Option, } -impl> +impl<'a, T: Clone + Debug + Eq + Hash + Send + Sync + Into> + From> + AsRef<[u8]>> - Stage + Instance<'a, T> where Vec: From { - pub fn new(tx: Arc>>>, - rx: Arc>>>, + pub fn new(tx: &'a channel::Sender>, + rx: &'a channel::Receiver>, broadcast_value: Option) -> Self { - Stage { + Instance { tx: tx, rx: rx, broadcast_value: broadcast_value, @@ -62,12 +60,6 @@ where Vec: From /// TODO: Detailed error status. pub fn run(&mut self) -> Result { // Broadcast state machine thread. - // - // rx cannot be cloned due to its type constraint but can be used inside - // a thread with the help of an `Arc` (`Rc` wouldn't work for the same - // reason). A `Mutex` is used to grant write access. - let rx = self.rx.to_owned(); - let tx = self.tx.to_owned(); let bvalue = self.broadcast_value.to_owned(); let result: Result; let result_r = Arc::new(Mutex::new(None)); @@ -76,7 +68,7 @@ where Vec: From crossbeam::scope(|scope| { scope.spawn(move || { *result_r_scoped.lock().unwrap() = - Some(inner_run(tx, rx, bvalue)); + Some(inner_run(self.tx, self.rx, bvalue)); }); }); if let Some(ref r) = *result_r.lock().unwrap() { @@ -98,11 +90,11 @@ pub enum BroadcastError { /// Breaks the input value into shards of equal length and encodes them -- and /// some extra parity shards -- with a Reed-Solomon erasure coding scheme. -fn send_shards(value: T, - tx: Arc>>>, - coding: &ReedSolomon, - data_shard_num: usize, - parity_shard_num: usize) +fn send_shards<'a, T>(value: T, + tx: &'a channel::Sender>, + coding: &ReedSolomon, + data_shard_num: usize, + parity_shard_num: usize) where T: Clone + Debug + Send + Sync + Into> + From> + AsRef<[u8]> , Vec: From @@ -150,16 +142,16 @@ where T: Clone + Debug + Send + Sync + Into> for leaf_value in mtree.iter().cloned() { let proof = mtree.gen_proof(leaf_value); if let Some(proof) = proof { - tx.lock().unwrap().send(Message::Broadcast( + tx.send(Message::Broadcast( BroadcastMessage::Value(proof))).unwrap(); } } } /// The main loop of the broadcast task. -fn inner_run(tx: Arc>>>, - rx: Arc>>>, - broadcast_value: Option) -> Result +fn inner_run<'a, T>(tx: &'a channel::Sender>, + rx: &'a channel::Receiver>, + broadcast_value: Option) -> Result where T: Clone + Debug + Eq + Hash + Send + Sync + Into> + From> + AsRef<[u8]> , Vec: From @@ -175,7 +167,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> // // FIXME: Does the node send a proof to itself? if let Some(v) = broadcast_value { - send_shards(v, tx.clone(), &coding, data_shard_num, parity_shard_num); + send_shards(v, tx, &coding, data_shard_num, parity_shard_num); } // currently known leaf values @@ -197,7 +189,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> // TODO: handle exit conditions while result == None { // Receive a message from the socket IO task. - let message = rx.lock().unwrap().recv().unwrap(); + let message = rx.recv().unwrap(); if let Message::Broadcast(message) = message { match message { // A value received. Record the value and multicast an echo. @@ -220,9 +212,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> } } // Broadcast an echo of this proof. - tx.lock().unwrap() - .send(Message::Broadcast( - BroadcastMessage::Echo(p))) + tx.send(Message::Broadcast(BroadcastMessage::Echo(p))) .unwrap() }, @@ -266,7 +256,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> // Ready if !ready_sent { ready_sent = true; - tx.lock().unwrap().send(Message::Broadcast( + tx.send(Message::Broadcast( BroadcastMessage::Ready(h.to_owned()))) .unwrap(); } @@ -291,7 +281,7 @@ where T: Clone + Debug + Eq + Hash + Send + Sync + Into> if (ready_num == PLACEHOLDER_F + 1) && !ready_sent { - tx.lock().unwrap().send(Message::Broadcast( + tx.send(Message::Broadcast( BroadcastMessage::Ready(h.to_vec()))).unwrap(); } diff --git a/src/commst.rs b/src/commst.rs index 1f75a03..c91b795 100644 --- a/src/commst.rs +++ b/src/commst.rs @@ -3,33 +3,33 @@ //! `spmc::channel()` and `mpsc::channel()`. use std::fmt::Debug; use std::sync::{Arc, Mutex}; -use std::sync::mpsc; -use spmc; use crossbeam; +use crossbeam_channel as channel; use proto::Message; use task; /// A communication task connects a remote node to the thread that manages the /// consensus algorithm. -pub struct CommsTask<'a, T: Send + Sync + From> + Into>> +pub struct CommsTask<'a, 'b, T: 'a + Send + Sync + + From> + Into>> where Vec: From { /// The transmit side of the multiple producer channel from comms threads. - tx: mpsc::Sender>, + tx: &'a channel::Sender>, /// The receive side of the multiple consumer channel to comms threads. - rx: spmc::Receiver>, + rx: &'a channel::Receiver>, /// The socket IO task. - task: task::Task<'a> + task: task::Task<'b> } -impl<'a, T: Debug + Send + Sync + From> + Into>> - CommsTask<'a, T> +impl<'a, 'b, T: Debug + Send + Sync + From> + Into>> + CommsTask<'a, 'b, T> where Vec: From { - pub fn new(tx: mpsc::Sender>, - rx: spmc::Receiver>, - stream: &'a ::std::net::TcpStream) -> Self { + pub fn new(tx: &'a channel::Sender>, + rx: &'a channel::Receiver>, + stream: &'b ::std::net::TcpStream) -> Self { CommsTask { tx: tx, rx: rx, @@ -41,8 +41,8 @@ where Vec: From /// thread requests. pub fn run(&mut self) { // Borrow parts of `self` before entering the thread binding scope. - let tx = Arc::new(&self.tx); - let rx = Arc::new(&self.rx); + let tx = Arc::new(self.tx); + let rx = Arc::new(self.rx); let task = Arc::new(Mutex::new(&mut self.task)); crossbeam::scope(|scope| { @@ -62,7 +62,7 @@ where Vec: From // Remote comms receive loop. loop { match task.lock().unwrap().receive_message() { - Ok(message) => // self.on_message_received(message), + Ok(message) => tx.send(message).unwrap(), Err(task::Error::ProtobufError(e)) => warn!("Protobuf error {}", e), @@ -75,10 +75,4 @@ where Vec: From }); } - - /// Handler of a received message. - fn on_message_received(&mut self, message: Message) { - // Forward the message to the manager thread. - self.tx.send(message).unwrap(); - } } diff --git a/src/connection.rs b/src/connection.rs index b6acfe4..d81e528 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use std::fmt::Debug; -use std::io::{Read, Write, BufReader}; +use std::io::BufReader; use std::net::{TcpStream, TcpListener, SocketAddr}; #[derive(Debug)] diff --git a/src/lib.rs b/src/lib.rs index deecd48..ee84ea5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,9 +42,8 @@ extern crate log; extern crate protobuf; extern crate ring; extern crate merkle; -//extern crate futures; -extern crate spmc; extern crate crossbeam; +extern crate crossbeam_channel; extern crate reed_solomon_erasure; mod connection; diff --git a/src/node.rs b/src/node.rs index 59656e2..351a6f5 100644 --- a/src/node.rs +++ b/src/node.rs @@ -3,11 +3,9 @@ use std::collections::HashSet; use std::fmt::Debug; use std::hash::Hash; use std::marker::{Send, Sync}; -use std::net::{TcpListener, SocketAddr}; -use std::sync::{Arc, Mutex}; -use std::sync::mpsc; -use spmc; +use std::net::SocketAddr; use crossbeam; +use crossbeam_channel as channel; use connection; use broadcast; @@ -39,38 +37,64 @@ where Vec: From /// Consensus node procedure implementing HoneyBadgerBFT. pub fn run(&self) -> Result { - // Multicast channel from the manager task to comms tasks. - let (stx, srx): (spmc::Sender>, - spmc::Receiver>) = spmc::channel(); - // Unicast channel from comms tasks to the manager task. - let (mtx, mrx): (mpsc::Sender>, - mpsc::Receiver>) = mpsc::channel(); + // Multiple-producer, multiple-consumer channel from comms tasks to + // algorithm actor tasks such as Reliable Broadcast. + let (from_comms_tx, from_comms_rx): + ( + channel::Sender>, + channel::Receiver> + ) = channel::unbounded(); + let (from_comms_tx, from_comms_rx) = (&from_comms_tx, &from_comms_rx); + + // Multiple-producer, multiple-consumer channel from algorithm actor + // tasks such as Reliable Broadcast to comms tasks. + let (to_comms_tx, to_comms_rx): + ( + channel::Sender>, + channel::Receiver> + ) = channel::unbounded(); + let (to_comms_tx, to_comms_rx) = (&to_comms_tx, &to_comms_rx); + let broadcast_value = self.value.to_owned(); let connections = connection::make(&self.addr, &self.remotes); // All spawned threads will have exited by the end of the scope. crossbeam::scope(|scope| { + // FIXME: Compute [i <- connections | v_i]. // Start a comms task for each connection. - for c in connections.iter() { - info!("Creating a comms task for {:?}", + for (i, c) in connections.iter().enumerate() { + // FIXME: + // + // - Connect the comms task to the broadcast instance. + // + // - Broadcast v_i through the broadcast instance? + + info!("Creating a comms task #{} for {:?}", i, c.stream.peer_addr().unwrap()); - let tx = mtx.clone(); - let rx = srx.clone(); scope.spawn(move || { - commst::CommsTask::new(tx, rx, &c.stream).run(); + commst::CommsTask::new(from_comms_tx, + to_comms_rx, + &c.stream) + .run(); + }); + + // Associate a broadcast instance to the above comms task. + scope.spawn(move || { + match broadcast::Instance::new(to_comms_tx, + from_comms_rx, + // FIXME + None) + .run() + { + Ok(_) => debug!("Broadcast instance #{} succeeded", i), + Err(_) => error!("Broadcast instance #{} failed", i) + } }); } - // broadcast stage - let (tx, rx) = (Arc::new(Mutex::new(stx)), - Arc::new(Mutex::new(mrx))); - match broadcast::Stage::new(tx, rx, broadcast_value).run() { - Ok(_) => debug!("Broadcast stage succeeded"), - Err(_) => error!("Broadcast stage failed") - } - - // TODO: other stages + // TODO: continue the implementation of the asynchronous common + // subset algorithm. }); // end of thread scope