diff --git a/src/commst.rs b/src/commst.rs index 57945fc..f431f30 100644 --- a/src/commst.rs +++ b/src/commst.rs @@ -11,8 +11,7 @@ use task; /// A communication task connects a remote node to the thread that manages the /// consensus algorithm. -pub struct CommsTask<'a, 'b, 'c, T: 'a + 'c + Send + Sync + - From> + Into>> +pub struct CommsTask<'a, T: 'a + Send + Sync + From> + Into>> where Vec: From { /// The transmit side of the multiple producer channel from comms threads. @@ -20,21 +19,21 @@ where Vec: From /// The receive side of the multiple consumer channel to comms threads. rx: &'a channel::Receiver>, /// The receive side of the private channel to the comms thread. - rx_priv: &'c channel::Receiver>, + rx_priv: &'a channel::Receiver>, /// The socket IO task. - task: task::Task<'b>, + task: task::Task, /// The index of this comms task for identification against its remote node. pub node_index: usize } -impl<'a, 'b, 'c, T: Debug + Send + Sync + From> + Into>> - CommsTask<'a, 'b, 'c, T> +impl<'a, T: Debug + Send + Sync + From> + Into>> + CommsTask<'a, T> where Vec: From { pub fn new(tx: &'a channel::Sender<(usize, Message)>, rx: &'a channel::Receiver>, - rx_priv: &'c channel::Receiver>, - stream: &'b ::std::net::TcpStream, + rx_priv: &'a channel::Receiver>, + stream: ::std::net::TcpStream, node_index: usize) -> Self { @@ -57,34 +56,38 @@ where Vec: From let tx = Arc::new(self.tx); let rx = Arc::new(self.rx); let rx_priv = Arc::new(self.rx_priv); - let task = Arc::new(Mutex::new(&mut self.task)); + let mut task1 = self.task.try_clone().unwrap(); // FIXME: handle errors let node_index = self.node_index; crossbeam::scope(|scope| { - // Make a further copy of `task` for the thread stack. - let task1 = task.clone(); - // Local comms receive loop thread. scope.spawn(move || { - select_loop! { - // Receive a multicast message from the manager thread. - recv(rx, message) => { - debug!("Node {} <- {:?}", node_index, message); - // Forward the message to the remote node. - task1.lock().unwrap().send_message(message).unwrap(); - }, - // Receive a private message from the manager thread. - recv(rx_priv, message) => { - debug!("Node {} <- {:?}", node_index, message); - // Forward the message to the remote node. - task1.lock().unwrap().send_message(message).unwrap(); + loop { + select_loop! { + // Receive a multicast message from the manager thread. + recv(rx, message) => { + debug!("Node {} <- {:?}", node_index, message); + // Forward the message to the remote node. + task1.send_message(message) + .unwrap(); + debug!("SENT Node {}", node_index); + }, + // Receive a private message from the manager thread. + recv(rx_priv, message) => { + debug!("Node {} <- {:?}", node_index, message); + // Forward the message to the remote node. + task1.send_message(message) + .unwrap(); + debug!("SENT Node {}", node_index); + } } } }); // Remote comms receive loop. loop { - match task.lock().unwrap().receive_message() { + debug!("Starting remote RX loop for node {}", node_index); + match self.task.receive_message() { Ok(message) => { debug!("Node {} -> {:?}", node_index, message); tx.send((node_index, message)).unwrap() diff --git a/src/node.rs b/src/node.rs index 6c6d08a..5fe4c24 100644 --- a/src/node.rs +++ b/src/node.rs @@ -104,7 +104,8 @@ where Vec: From commst::CommsTask::new(from_comms_tx, to_comms_rx, to_comms_1_rx, - &c.stream, + // FIXME: handle error + c.stream.try_clone().unwrap(), node_index) .run(); }); diff --git a/src/task.rs b/src/task.rs index ac0f139..3eadbe5 100644 --- a/src/task.rs +++ b/src/task.rs @@ -57,20 +57,27 @@ fn decode_u32_from_be(buffer: &[u8]) -> Result { Ok(result) } -pub struct Task<'a> { - stream: &'a TcpStream, +pub struct Task { + stream: TcpStream, buffer: [u8; 1024], } /// A message handling task. -impl<'a> Task<'a> where { - pub fn new(stream: &'a TcpStream) -> Task<'a> { +impl Task where { + pub fn new(stream: TcpStream) -> Task { Task { stream, buffer: [0; 1024] } } + pub fn try_clone(&self) -> Result { + Ok(Task { + stream: self.stream.try_clone()?, + buffer: [0; 1024] + }) + } + pub fn receive_message(&mut self) -> Result, Error> where T: From> + Send + Sync {