Refactored the comms from the consensus node module

I reduced the socket IO tasks to mere message forwarders. The algorithm
complexity lies in stage modules. Example: broadcast/mod.rs. Communication is
set up and modules are run from the node module.

There is a problem with this commit. std:🧵:spawn imposes a static
lifetime guarantee on type T in Message<T>.
This commit is contained in:
Vladimir Komendantskiy 2018-03-19 17:12:20 +00:00
parent 797f775008
commit c38aad2c0a
8 changed files with 161 additions and 146 deletions

View File

@ -16,6 +16,7 @@ ring = "^0.12"
rand = "*"
error-chain = "0.11"
protobuf = "1.4.4"
spmc = "0.2.2"
[build-dependencies]
protoc-rust = "1.4.4"

View File

@ -1,4 +1,4 @@
//! Binary Byzantine agreement protocol from a common coin protocol.
use futures::{Future, Stream};
use futures::future::*;
//use futures::{Future, Stream};
//use futures::future::*;

View File

@ -1,98 +1,59 @@
//! Reliable broadcast algorithm.
use std::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::net::{TcpStream, TcpListener, SocketAddr};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver};
//use errors::ResultExt;
use task::{Error, MessageLoop, Task};
use std::sync::mpsc;
use spmc;
use std::thread;
use proto::*;
use std::marker::{Send, Sync};
use merkle::*;
pub mod stage;
use self::stage::*;
/// A broadcast task is an instance of `Task`, a message-handling task with a
/// main loop.
pub struct BroadcastTask<T: Send + Sync + 'static> {
/// The underlying task that handles sending and receiving messages.
task: Task,
/// The receive end of the comms channel. The transmit end is stored in
/// `stage`.
receiver: Arc<Mutex<Receiver<Message<T>>>>,
/// Shared state of the broadcast stage.
stage: Arc<Mutex<Stage<T>>>
pub struct Stage<T: Send + Sync> {
/// The transmit side of the multiple consumer channel to comms threads.
pub tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
/// The receive side of the multiple producer channel from comms threads.
pub rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>,
/// Messages of type Value received so far, keyed with the root hash for
/// easy access.
pub values: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Echo received so far, keyed with the root hash for
/// easy access.
pub echos: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Ready received so far. That is, the root hashes in
/// those messages.
pub readys: HashSet<Vec<u8>>
}
impl<T: Clone + Debug + Send + Sync + 'static> BroadcastTask<T> {
pub fn new(stream: TcpStream,
receiver: Arc<Mutex<Receiver<Message<T>>>>,
stage: Arc<Mutex<Stage<T>>>) -> Self {
BroadcastTask {
task: Task::new(stream),
receiver: receiver,
stage: stage
impl<T: Send + Sync> Stage<T> {
pub fn new(tx: Arc<Mutex<spmc::Sender<Message<T>>>>,
rx: Arc<Mutex<mpsc::Receiver<Message<T>>>>) -> Self {
Stage {
tx: tx,
rx: rx,
values: Default::default(),
echos: Default::default(),
readys: Default::default()
}
}
fn on_message_received(&mut self, message: Message<T>)
-> Result<(), Error>
{
// info!("Message received: {:?}", message);
if let Message::Broadcast(b) = message {
Ok(())
/*
match b {
BroadcastMessage::Value(proof) => {
self.stage.values.insert(proof.root_hash.clone(), proof.clone());
Ok(())
},
BroadcastMessage::Echo(proof) => {
self.echos.insert(proof.root_hash.clone(), proof.clone());
Ok(())
},
BroadcastMessage::Ready(root_hash) => {
self.readys.insert(root_hash);
Ok(())
}
}
*/
}
else {
warn!("Unexpected message type");
return Err(Error::ProtocolError);
}
}
/// Broadcast stage main loop returning the computed values in case of
/// success, and an error in case of failure.
pub fn run(&self) -> Result<Vec<T>, ()> {
let mut aborted = false;
let mut decoded = false;
}
impl<T: Clone + Debug + From<Vec<u8>> + Send + Sync + 'static>
MessageLoop for BroadcastTask<T>
{
fn run(&mut self) {
let rx = self.receiver.clone();
// receiver_thread(rx.lock().unwrap());
::std::thread::spawn(move || {
loop {
let message = rx.lock().unwrap().recv().unwrap();
info!("Received message {:?}",
// self.task.stream.peer_addr().unwrap(),
message);
// Manager thread. rx cannot be cloned due to its type constraint but
// can be used inside a thread with the help of an `Arc` (`Rc` wouldn't
// work for the same reason).
let rx = Arc::new(Mutex::new(self.rx));
let manager = thread::spawn(move || {
while !aborted && !decoded {
// TODO
}
});
loop {
match self.task.receive_message() {
Ok(message) => self.on_message_received(message).unwrap(),
Err(Error::ProtobufError(e)) => warn!("Protobuf error {}", e),
Err(e) => {
warn!("Critical error {:?}", e);
break;
}
}
}
//interthread_receiver.join().unwrap();
manager.join().unwrap();
// TODO
Err(())
}
}

View File

@ -1,32 +0,0 @@
//! Broadcast stage of the consensus algorithm.
use std::collections::{HashMap, HashSet};
use std::marker::{Send, Sync};
use std::sync::mpsc::{channel, Sender};
use std::sync::Mutex;
use merkle::*;
use proto::*;
pub struct Stage<T: Send + Sync> {
/// Tx channels to communicate with all tasks.
pub senders: Vec<Sender<Message<T>>>,
/// Messages of type Value received so far, keyed with the root hash for
/// easy access.
pub values: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Echo received so far, keyed with the root hash for
/// easy access.
pub echos: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Ready received so far. That is, the root hashes in
/// those messages.
pub readys: HashSet<Vec<u8>>
}
impl<T: Send + Sync> Stage<T> {
pub fn new(senders: Vec<Sender<Message<T>>>) -> Self {
Stage {
senders: senders,
values: Default::default(),
echos: Default::default(),
readys: Default::default()
}
}
}

73
src/commst.rs Normal file
View File

@ -0,0 +1,73 @@
//! Comms task structure. A comms task communicates with a remote node through a
//! socket. Local communication with coordinating threads is made via
//! `spmc::channel()` and `mpsc::channel()`.
use std::fmt::Debug;
//use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use spmc;
use proto::Message;
use task;
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<T: Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
where Vec<u8>: From<T>
{
/// The transmit side of the multiple producer channel from comms threads.
tx: mpsc::Sender<Message<T>>,
/// The receive side of the multiple consumer channel to comms threads.
rx: spmc::Receiver<Message<T>>,
/// The socket IO task.
task: task::Task
}
impl<T: Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
CommsTask<T>
where Vec<u8>: From<T>
{
pub fn new(tx: mpsc::Sender<Message<T>>,
rx: spmc::Receiver<Message<T>>,
stream: ::std::net::TcpStream) -> Self {
CommsTask {
tx: tx,
rx: rx,
task: task::Task::new(stream)
}
}
/// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests.
pub fn run(&mut self) {
// Local comms receive loop.
let comms = thread::spawn(move || {
loop {
// Receive a message from the manager thread.
let message = self.rx.recv().unwrap();
// Forward the message to the remote node.
self.task.send_message(message).unwrap();
}
});
// Remote comms receive loop.
loop {
match self.task.receive_message() {
Ok(message) => self.on_message_received(message),
Err(task::Error::ProtobufError(e)) =>
warn!("Protobuf error {}", e),
Err(e) => {
warn!("Critical error {:?}", e);
break;
}
}
}
comms.join().unwrap();
}
/// Handler of a received message.
fn on_message_received(&mut self, message: Message<T>) {
// Forward the message to the manager thread.
self.tx.send(message).unwrap();
}
}

View File

@ -7,11 +7,13 @@ extern crate log;
extern crate protobuf;
extern crate ring;
extern crate merkle;
extern crate futures;
//extern crate futures;
extern crate spmc;
mod errors;
mod proto;
mod task;
mod commst;
pub mod node;
pub mod broadcast;

View File

@ -1,14 +1,16 @@
//! Networking controls of the consensus node.
use std::sync::mpsc;
use std::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::marker::{Send, Sync};
use std::net::{TcpStream, TcpListener, SocketAddr};
use std::net::{TcpListener, SocketAddr};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, Receiver, channel};
use broadcast::*;
use broadcast::stage::Stage as BroadcastStage;
use std::sync::mpsc;
use std::thread;
use spmc;
use broadcast;
//use broadcast::Stage as BroadcastStage;
use proto::Message;
use commst;
/// This is a structure to start a consensus node.
pub struct Node {
@ -23,32 +25,52 @@ impl Node {
Node {addr, remotes}
}
pub fn run<T: Clone + Debug + Send + Sync + 'static>(&self) {
pub fn run<T: Clone + Debug + Send + Sync + From<Vec<u8>>>(&self)
where Vec<u8>: From<T>
{
// Listen for incoming connections on a given TCP port.
let listener = TcpListener::bind(&self.addr).unwrap();
let broadcast_stage: Arc<Mutex<BroadcastStage<T>>> =
Arc::new(Mutex::new(BroadcastStage::new(Vec::new())));
// Multicast channel from the manager task to comms tasks.
let (stx, srx): (spmc::Sender<Message<T>>,
spmc::Receiver<Message<T>>) = spmc::channel();
// Unicast channel from comms tasks to the manager task.
let (mtx, mrx): (mpsc::Sender<Message<T>>,
mpsc::Receiver<Message<T>>) = mpsc::channel();
let comms_threads = Vec::new();
// Listen for incoming socket connections and start a comms task for
// each new connection.
for stream in listener.incoming() {
match stream {
Ok(stream) => {
info!("New connection from {:?}",
stream.peer_addr().unwrap());
let (tx, rx): (Sender<Message<T>>, Receiver<Message<T>>) =
channel();
let rx = Arc::new(Mutex::new(rx));
let stage = Arc::clone(&broadcast_stage);
// Insert the transmit handle connected to this task into
// the shared list of senders.
stage.lock().unwrap().senders.push(tx);
let task = BroadcastTask::new(stream, rx, stage);
let comms_task = commst::CommsTask::new(mtx, srx, stream);
comms_threads.push(thread::spawn(move || {
comms_task.run();
}));
// TODO: spawn a thread for the connected socket
// TODO: break when all the consensus participants have
// joined
}
Err(e) => {
warn!("Failed to connect: {}", e);
}
}
}
// broadcast stage
let stage = broadcast::Stage::new(Arc::new(Mutex::new(stx)),
Arc::new(Mutex::new(mrx)));
let broadcast_result = stage.run();
match broadcast_result {
Ok(v) => unimplemented!(),
Err(e) => error!("Broadcast stage failed")
}
// wait for all threads to finish
for t in comms_threads {
t.join().unwrap();
}
}
}

View File

@ -59,25 +59,13 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
Ok(result)
}
/// A trait allowing custom definitions of the main loop.
pub trait MessageLoop {
fn run(&mut self);
}
pub struct Task {
pub stream: TcpStream,
buffer: [u8; 1024],
}
/// Placeholder `MessageLoop` definition for a generic `Task`.
///
/// TODO: not needed? remove?
impl MessageLoop for Task {
fn run(&mut self) {}
}
/// A message handling task.
impl Task where Self: MessageLoop {
impl Task where {
pub fn new(stream: TcpStream) -> Task {
Task {
stream,