lifted the static requirement for messages by using scoped threads

This commit is contained in:
Vladimir Komendantskiy 2018-03-20 16:32:19 +00:00
parent c38aad2c0a
commit fc475004f0
6 changed files with 83 additions and 66 deletions

View File

@ -9,7 +9,6 @@ simple_logger = "0.5"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.1"
futures = "0.1"
reed-solomon-erasure = "3.0"
merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" }
ring = "^0.12"
@ -17,6 +16,7 @@ rand = "*"
error-chain = "0.11"
protobuf = "1.4.4"
spmc = "0.2.2"
crossbeam = "0.3.2"
[build-dependencies]
protoc-rust = "1.4.4"

View File

@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use spmc;
use std::thread;
use crossbeam;
use proto::*;
use std::marker::{Send, Sync};
use merkle::*;
@ -42,17 +42,20 @@ impl<T: Send + Sync> Stage<T> {
let mut aborted = false;
let mut decoded = false;
// Manager thread. rx cannot be cloned due to its type constraint but
// can be used inside a thread with the help of an `Arc` (`Rc` wouldn't
// Manager thread.
//
// rx cannot be cloned due to its type constraint but can be used
// inside a thread with the help of an `Arc` (`Rc` wouldn't
// work for the same reason).
let rx = Arc::new(Mutex::new(self.rx));
let manager = thread::spawn(move || {
while !aborted && !decoded {
// TODO
}
let rx = &self.rx.clone();
crossbeam::scope(|scope| {
scope.spawn(move || {
while !aborted && !decoded {
rx;
// TODO
}
});
});
manager.join().unwrap();
// TODO
Err(())
}

View File

@ -2,10 +2,11 @@
//! socket. Local communication with coordinating threads is made via
//! `spmc::channel()` and `mpsc::channel()`.
use std::fmt::Debug;
//use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use spmc;
use crossbeam;
use proto::Message;
use task;
@ -39,30 +40,40 @@ where Vec<u8>: From<T>
/// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests.
pub fn run(&mut self) {
// Local comms receive loop.
let comms = thread::spawn(move || {
// Borrow parts of `self` before entering the thread binding scope.
let tx = Arc::new(&self.tx);
let rx = Arc::new(&self.rx);
let task = Arc::new(Mutex::new(&mut self.task));
crossbeam::scope(|scope| {
// Make a further copy of `task` for the thread stack.
let task1 = task.clone();
// Local comms receive loop thread.
scope.spawn(move || {
loop {
// Receive a message from the manager thread.
let message = rx.recv().unwrap();
// Forward the message to the remote node.
task1.lock().unwrap().send_message(message).unwrap();
}
});
// Remote comms receive loop.
loop {
// Receive a message from the manager thread.
let message = self.rx.recv().unwrap();
// Forward the message to the remote node.
self.task.send_message(message).unwrap();
match task.lock().unwrap().receive_message() {
Ok(message) => // self.on_message_received(message),
tx.send(message).unwrap(),
Err(task::Error::ProtobufError(e)) =>
warn!("Protobuf error {}", e),
Err(e) => {
warn!("Critical error {:?}", e);
break;
}
}
}
});
// Remote comms receive loop.
loop {
match self.task.receive_message() {
Ok(message) => self.on_message_received(message),
Err(task::Error::ProtobufError(e)) =>
warn!("Protobuf error {}", e),
Err(e) => {
warn!("Critical error {:?}", e);
break;
}
}
}
comms.join().unwrap();
}
/// Handler of a received message.

View File

@ -9,6 +9,7 @@ extern crate ring;
extern crate merkle;
//extern crate futures;
extern crate spmc;
extern crate crossbeam;
mod errors;
mod proto;

View File

@ -5,10 +5,10 @@ use std::marker::{Send, Sync};
use std::net::{TcpListener, SocketAddr};
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use spmc;
use crossbeam;
use broadcast;
//use broadcast::Stage as BroadcastStage;
use proto::Message;
use commst;
@ -36,41 +36,43 @@ impl Node {
// Unicast channel from comms tasks to the manager task.
let (mtx, mrx): (mpsc::Sender<Message<T>>,
mpsc::Receiver<Message<T>>) = mpsc::channel();
let comms_threads = Vec::new();
// Listen for incoming socket connections and start a comms task for
// each new connection.
for stream in listener.incoming() {
match stream {
Ok(stream) => {
info!("New connection from {:?}",
stream.peer_addr().unwrap());
let comms_task = commst::CommsTask::new(mtx, srx, stream);
comms_threads.push(thread::spawn(move || {
comms_task.run();
}));
// All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| {
// TODO: break when all the consensus participants have
// joined
}
Err(e) => {
warn!("Failed to connect: {}", e);
// Listen for incoming socket connections and start a comms task for
// each new connection.
for stream in listener.incoming() {
match stream {
Ok(stream) => {
info!("New connection from {:?}",
stream.peer_addr().unwrap());
let tx = mtx.clone();
let rx = srx.clone();
scope.spawn(move || {
commst::CommsTask::new(tx, rx, stream).run();
});
// TODO: break when all the consensus participants have
// joined
}
Err(e) => {
warn!("Failed to connect: {}", e);
}
}
}
}
// broadcast stage
let stage = broadcast::Stage::new(Arc::new(Mutex::new(stx)),
Arc::new(Mutex::new(mrx)));
let broadcast_result = stage.run();
match broadcast_result {
Ok(v) => unimplemented!(),
Err(e) => error!("Broadcast stage failed")
}
// broadcast stage
let (tx, rx) = (Arc::new(Mutex::new(stx)), Arc::new(Mutex::new(mrx)));
let stage = broadcast::Stage::new(tx, rx);
let broadcast_result = stage.run();
match broadcast_result {
Ok(v) => unimplemented!(),
Err(e) => error!("Broadcast stage failed")
}
// wait for all threads to finish
for t in comms_threads {
t.join().unwrap();
}
// TODO: other stages
}); // end of thread scope
}
}

View File

@ -60,7 +60,7 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
}
pub struct Task {
pub stream: TcpStream,
stream: TcpStream,
buffer: [u8; 1024],
}