mirror of https://github.com/poanetwork/hbbft.git
removed deadlock in CommsTask::run between the socket reader and writer
This commit is contained in:
parent
6cbfbace1c
commit
6b90917f79
|
@ -11,8 +11,7 @@ use task;
|
|||
|
||||
/// A communication task connects a remote node to the thread that manages the
|
||||
/// consensus algorithm.
|
||||
pub struct CommsTask<'a, 'b, 'c, T: 'a + 'c + Send + Sync +
|
||||
From<Vec<u8>> + Into<Vec<u8>>>
|
||||
pub struct CommsTask<'a, T: 'a + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
|
||||
where Vec<u8>: From<T>
|
||||
{
|
||||
/// The transmit side of the multiple producer channel from comms threads.
|
||||
|
@ -20,21 +19,21 @@ where Vec<u8>: From<T>
|
|||
/// The receive side of the multiple consumer channel to comms threads.
|
||||
rx: &'a channel::Receiver<Message<T>>,
|
||||
/// The receive side of the private channel to the comms thread.
|
||||
rx_priv: &'c channel::Receiver<Message<T>>,
|
||||
rx_priv: &'a channel::Receiver<Message<T>>,
|
||||
/// The socket IO task.
|
||||
task: task::Task<'b>,
|
||||
task: task::Task,
|
||||
/// The index of this comms task for identification against its remote node.
|
||||
pub node_index: usize
|
||||
}
|
||||
|
||||
impl<'a, 'b, 'c, T: Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
|
||||
CommsTask<'a, 'b, 'c, T>
|
||||
impl<'a, T: Debug + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
|
||||
CommsTask<'a, T>
|
||||
where Vec<u8>: From<T>
|
||||
{
|
||||
pub fn new(tx: &'a channel::Sender<(usize, Message<T>)>,
|
||||
rx: &'a channel::Receiver<Message<T>>,
|
||||
rx_priv: &'c channel::Receiver<Message<T>>,
|
||||
stream: &'b ::std::net::TcpStream,
|
||||
rx_priv: &'a channel::Receiver<Message<T>>,
|
||||
stream: ::std::net::TcpStream,
|
||||
node_index: usize) ->
|
||||
Self
|
||||
{
|
||||
|
@ -57,34 +56,38 @@ where Vec<u8>: From<T>
|
|||
let tx = Arc::new(self.tx);
|
||||
let rx = Arc::new(self.rx);
|
||||
let rx_priv = Arc::new(self.rx_priv);
|
||||
let task = Arc::new(Mutex::new(&mut self.task));
|
||||
let mut task1 = self.task.try_clone().unwrap(); // FIXME: handle errors
|
||||
let node_index = self.node_index;
|
||||
|
||||
crossbeam::scope(|scope| {
|
||||
// Make a further copy of `task` for the thread stack.
|
||||
let task1 = task.clone();
|
||||
|
||||
// Local comms receive loop thread.
|
||||
scope.spawn(move || {
|
||||
select_loop! {
|
||||
// Receive a multicast message from the manager thread.
|
||||
recv(rx, message) => {
|
||||
debug!("Node {} <- {:?}", node_index, message);
|
||||
// Forward the message to the remote node.
|
||||
task1.lock().unwrap().send_message(message).unwrap();
|
||||
},
|
||||
// Receive a private message from the manager thread.
|
||||
recv(rx_priv, message) => {
|
||||
debug!("Node {} <- {:?}", node_index, message);
|
||||
// Forward the message to the remote node.
|
||||
task1.lock().unwrap().send_message(message).unwrap();
|
||||
loop {
|
||||
select_loop! {
|
||||
// Receive a multicast message from the manager thread.
|
||||
recv(rx, message) => {
|
||||
debug!("Node {} <- {:?}", node_index, message);
|
||||
// Forward the message to the remote node.
|
||||
task1.send_message(message)
|
||||
.unwrap();
|
||||
debug!("SENT Node {}", node_index);
|
||||
},
|
||||
// Receive a private message from the manager thread.
|
||||
recv(rx_priv, message) => {
|
||||
debug!("Node {} <- {:?}", node_index, message);
|
||||
// Forward the message to the remote node.
|
||||
task1.send_message(message)
|
||||
.unwrap();
|
||||
debug!("SENT Node {}", node_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Remote comms receive loop.
|
||||
loop {
|
||||
match task.lock().unwrap().receive_message() {
|
||||
debug!("Starting remote RX loop for node {}", node_index);
|
||||
match self.task.receive_message() {
|
||||
Ok(message) => {
|
||||
debug!("Node {} -> {:?}", node_index, message);
|
||||
tx.send((node_index, message)).unwrap()
|
||||
|
|
|
@ -104,7 +104,8 @@ where Vec<u8>: From<T>
|
|||
commst::CommsTask::new(from_comms_tx,
|
||||
to_comms_rx,
|
||||
to_comms_1_rx,
|
||||
&c.stream,
|
||||
// FIXME: handle error
|
||||
c.stream.try_clone().unwrap(),
|
||||
node_index)
|
||||
.run();
|
||||
});
|
||||
|
|
15
src/task.rs
15
src/task.rs
|
@ -57,20 +57,27 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result<u32, Error> {
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
pub struct Task<'a> {
|
||||
stream: &'a TcpStream,
|
||||
pub struct Task {
|
||||
stream: TcpStream,
|
||||
buffer: [u8; 1024],
|
||||
}
|
||||
|
||||
/// A message handling task.
|
||||
impl<'a> Task<'a> where {
|
||||
pub fn new(stream: &'a TcpStream) -> Task<'a> {
|
||||
impl Task where {
|
||||
pub fn new(stream: TcpStream) -> Task {
|
||||
Task {
|
||||
stream,
|
||||
buffer: [0; 1024]
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_clone(&self) -> Result<Task, ::std::io::Error> {
|
||||
Ok(Task {
|
||||
stream: self.stream.try_clone()?,
|
||||
buffer: [0; 1024]
|
||||
})
|
||||
}
|
||||
|
||||
pub fn receive_message<T>(&mut self) -> Result<Message<T>, Error>
|
||||
where T: From<Vec<u8>> + Send + Sync
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue